diff --git a/lib/config.cpp b/lib/config.cpp index 2d2add05..30801d13 100644 --- a/lib/config.cpp +++ b/lib/config.cpp @@ -229,7 +229,7 @@ bool Util::Config::parseArgs(int & argc, char ** & argv) { #endif /*LTS-END*/ std::cout << "Built on " __DATE__ ", " __TIME__ << std::endl; - exit(1); + exit(0); break; default: jsonForEach(vals, it) { diff --git a/lib/dtsc.h b/lib/dtsc.h index 730d7d5d..ec69d611 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -19,6 +19,12 @@ #define DTSC_ARR 0x0A #define DTSC_CON 0xFF +//Increase this value every time the DTSH file format changes in an incompatible way +//Changelog: +// Version 0-2: Undocumented changes +// Version 3: switched to bigMeta-style by default, Parts layout switched from 3/2/4 to 3/3/3 bytes +#define DTSH_VERSION 3 + namespace DTSC { ///\brief This enum holds all possible datatypes for DTSC packets. @@ -125,6 +131,7 @@ namespace DTSC { int getDataLen() const; int getPayloadLen() const; JSON::Value toJSON() const; + std::string toSummary() const; Scan getScan() const; protected: bool master; @@ -193,21 +200,22 @@ namespace DTSC { ///\brief Basic class for storage of data associated with single DTSC packets, a.k.a. parts. class Part { public: - long getSize(); - void setSize(long newSize); - short getDuration(); - void setDuration(short newDuration); - long getOffset(); - void setOffset(long newOffset); + uint32_t getSize(); + void setSize(uint32_t newSize); + uint32_t getDuration(); + void setDuration(uint32_t newDuration); + uint32_t getOffset(); + void setOffset(uint32_t newOffset); char * getData(); void toPrettyString(std::ostream & str, int indent = 0); private: +#define PACKED_PART_SIZE 9 ///\brief Data storage for this Part. /// /// - 3 bytes: MSB storage of the payload size of this packet in bytes. - /// - 2 bytes: MSB storage of the duration of this packet in milliseconds. - /// - 4 bytes: MSB storage of the presentation time offset of this packet in milliseconds. - char data[9]; + /// - 3 bytes: MSB storage of the duration of this packet in milliseconds. + /// - 3 bytes: MSB storage of the presentation time offset of this packet in milliseconds. + char data[PACKED_PART_SIZE]; }; ///\brief Basic class for storage of data associated with keyframes. @@ -228,7 +236,6 @@ namespace DTSC { char * getData(); void toPrettyString(std::ostream & str, int indent = 0); private: -#ifdef BIGMETA #define PACKED_KEY_SIZE 25 ///\brief Data storage for this Key. /// @@ -237,16 +244,6 @@ namespace DTSC { /// - 4 bytes: MSB storage of the number of this keyframe. /// - 2 bytes: MSB storage of the amount of parts in this keyframe. /// - 8 bytes: MSB storage of the timestamp associated with this keyframe's first packet. -#else -#define PACKED_KEY_SIZE 16 - ///\brief Data storage for this Key. - /// - /// - 5 bytes: MSB storage of the position of the first packet of this keyframe within the file. - /// - 3 bytes: MSB storage of the duration of this keyframe. - /// - 2 bytes: MSB storage of the number of this keyframe. - /// - 2 bytes: MSB storage of the amount of parts in this keyframe. - /// - 4 bytes: MSB storage of the timestamp associated with this keyframe's first packet. -#endif char data[PACKED_KEY_SIZE]; }; @@ -264,7 +261,6 @@ namespace DTSC { char * getData(); void toPrettyString(std::ostream & str, int indent = 0); private: -#ifdef BIGMETA #define PACKED_FRAGMENT_SIZE 13 ///\brief Data storage for this Fragment. /// @@ -272,15 +268,6 @@ namespace DTSC { /// - 1 byte: length (amount of keyframes) /// - 4 bytes: number of first keyframe in fragment /// - 4 bytes: size of fragment in bytes -#else -#define PACKED_FRAGMENT_SIZE 11 - ///\brief Data storage for this Fragment. - /// - /// - 4 bytes: duration (in milliseconds) - /// - 1 byte: length (amount of keyframes) - /// - 2 bytes: number of first keyframe in fragment - /// - 4 bytes: size of fragment in bytes -#endif char data[PACKED_FRAGMENT_SIZE]; }; @@ -362,12 +349,14 @@ namespace DTSC { void writeTo(char * p); JSON::Value toJSON(); void reset(); + bool toFile(const std::string & fileName); void toPrettyString(std::ostream & str, int indent = 0, int verbosity = 0); //members: std::map tracks; bool vod; bool live; bool merged; + uint16_t version; long long int moreheader; long long int bufferWindow; }; diff --git a/lib/dtscmeta.cpp b/lib/dtscmeta.cpp index c4cf8b48..83b5e8f2 100644 --- a/lib/dtscmeta.cpp +++ b/lib/dtscmeta.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #define AUDIO_KEY_INTERVAL 5000 ///< This define controls the keyframe interval for non-video tracks, such as audio and metadata tracks. @@ -437,6 +438,18 @@ namespace DTSC { return result; } + std::string Packet::toSummary() const { + std::stringstream out; + char * res = 0; + unsigned int len = 0; + getString("data", res, len); + out << getTrackId() << "@" << getTime() << ": " << len << " bytes"; + if (hasMember("keyframe")){ + out << " (keyframe)"; + } + return out.str(); + } + /// Create an invalid DTSC::Scan object by default. Scan::Scan() { p = 0; @@ -870,33 +883,33 @@ namespace DTSC { /*LTS-END*/ ///\brief Returns the payloadsize of a part - long Part::getSize() { + uint32_t Part::getSize() { return Bit::btoh24(data); } ///\brief Sets the payloadsize of a part - void Part::setSize(long newSize) { + void Part::setSize(uint32_t newSize) { Bit::htob24(data, newSize); } - ///\brief Retruns the duration of a part - short Part::getDuration() { - return Bit::btohs(data + 3); + ///\brief Returns the duration of a part + uint32_t Part::getDuration() { + return Bit::btoh24(data + 3); } ///\brief Sets the duration of a part - void Part::setDuration(short newDuration) { - Bit::htobs(data + 3, newDuration); + void Part::setDuration(uint32_t newDuration) { + Bit::htob24(data + 3, newDuration); } ///\brief returns the offset of a part - long Part::getOffset() { - return Bit::btohl(data + 5); + uint32_t Part::getOffset() { + return Bit::btoh24(data + 6); } ///\brief Sets the offset of a part - void Part::setOffset(long newOffset) { - Bit::htobl(data + 5, newOffset); + void Part::setOffset(uint32_t newOffset) { + Bit::htob24(data + 6, newOffset); } ///\brief Returns the data of a part @@ -913,93 +926,49 @@ namespace DTSC { ///\brief Returns the byteposition of a keyframe unsigned long long Key::getBpos() { -#ifdef BIGMETA return Bit::btohll(data); -#else - return (((unsigned long long)data[0] << 32) | (data[1] << 24) | (data[2] << 16) | (data[3] << 8) | data[4]); -#endif } void Key::setBpos(unsigned long long newBpos) { -#ifdef BIGMETA Bit::htobll(data, newBpos); -#else - data[4] = newBpos & 0xFF; - data[3] = (newBpos >> 8) & 0xFF; - data[2] = (newBpos >> 16) & 0xFF; - data[1] = (newBpos >> 24) & 0xFF; - data[0] = (newBpos >> 32) & 0xFF; -#endif } unsigned long Key::getLength() { -#ifdef BIGMETA return Bit::btoh24(data+8); -#else - return Bit::btoh24(data+5); -#endif } void Key::setLength(unsigned long newLength) { -#ifdef BIGMETA Bit::htob24(data+8, newLength); -#else - Bit::htob24(data+5, newLength); -#endif } ///\brief Returns the number of a keyframe unsigned long Key::getNumber() { -#ifdef BIGMETA return Bit::btohl(data + 11); -#else - return Bit::btohs(data + 8); -#endif } ///\brief Sets the number of a keyframe void Key::setNumber(unsigned long newNumber) { -#ifdef BIGMETA Bit::htobl(data + 11, newNumber); -#else - Bit::htobs(data + 8, newNumber); -#endif } ///\brief Returns the number of parts of a keyframe unsigned short Key::getParts() { -#ifdef BIGMETA return Bit::btohs(data + 15); -#else - return Bit::btohs(data + 10); -#endif } ///\brief Sets the number of parts of a keyframe void Key::setParts(unsigned short newParts) { -#ifdef BIGMETA Bit::htobs(data + 15, newParts); -#else - Bit::htobs(data + 10, newParts); -#endif } ///\brief Returns the timestamp of a keyframe unsigned long long Key::getTime() { -#ifdef BIGMETA return Bit::btohll(data + 17); -#else - return Bit::btohl(data + 12); -#endif } ///\brief Sets the timestamp of a keyframe void Key::setTime(unsigned long long newTime) { -#ifdef BIGMETA Bit::htobll(data + 17, newTime); -#else - Bit::htobl(data + 12, newTime); -#endif } ///\brief Returns the data of this keyframe struct @@ -1036,38 +1005,22 @@ namespace DTSC { ///\brief Returns the number of the first keyframe in this fragment unsigned long Fragment::getNumber() { -#ifdef BIGMETA return Bit::btohl(data + 5); -#else - return Bit::btohs(data + 5); -#endif } ///\brief Sets the number of the first keyframe in this fragment void Fragment::setNumber(unsigned long newNumber) { -#ifdef BIGMETA Bit::htobl(data + 5, newNumber); -#else - Bit::htobs(data + 5, newNumber); -#endif } ///\brief Returns the size of a fragment unsigned long Fragment::getSize() { -#ifdef BIGMETA return Bit::btohl(data + 9); -#else - return Bit::btohl(data + 7); -#endif } ///\brief Sets the size of a fragement void Fragment::setSize(unsigned long newSize) { -#ifdef BIGMETA Bit::htobl(data + 9, newSize); -#else - Bit::htobl(data + 7, newSize); -#endif } ///\brief Returns thte data of this fragment structure @@ -1348,6 +1301,7 @@ namespace DTSC { Meta::Meta() { vod = false; live = false; + version = DTSH_VERSION; moreheader = 0; merged = false; bufferWindow = 0; @@ -1361,6 +1315,7 @@ namespace DTSC { tracks.clear(); vod = source.getFlag("vod"); live = source.getFlag("live"); + version = source.getInt("version"); merged = source.getFlag("merged"); bufferWindow = source.getInt("buffer_window"); moreheader = source.getInt("moreheader"); @@ -1383,6 +1338,7 @@ namespace DTSC { Meta::Meta(JSON::Value & meta) { vod = meta.isMember("vod") && meta["vod"]; live = meta.isMember("live") && meta["live"]; + version = meta.isMember("version") ? meta["version"].asInt() : 0; merged = meta.isMember("merged") && meta["merged"]; bufferWindow = 0; if (meta.isMember("buffer_window")) { @@ -1798,6 +1754,7 @@ namespace DTSC { dataLen += it->second.getSendLen(skipDynamic); } } + if (version){dataLen += 17;} return dataLen + 8; //add 8 bytes header } @@ -1823,6 +1780,10 @@ namespace DTSC { writePointer(p, "\000\006merged\001", 9); writePointer(p, convertLongLong(1), 8); } + if (version) { + writePointer(p, "\000\006version\001", 9); + writePointer(p, convertLongLong(version), 8); + } if (bufferWindow) { writePointer(p, "\000\015buffer_window\001", 16); writePointer(p, convertLongLong(bufferWindow), 8); @@ -1856,6 +1817,10 @@ namespace DTSC { conn.SendNow("\000\006merged\001", 9); conn.SendNow(convertLongLong(1), 8); } + if (version) { + conn.SendNow("\000\006version\001", 9); + conn.SendNow(convertLongLong(version), 8); + } if (bufferWindow) { conn.SendNow("\000\015buffer_window\001", 16); conn.SendNow(convertLongLong(bufferWindow), 8); @@ -1948,10 +1913,20 @@ namespace DTSC { if (bufferWindow) { result["buffer_window"] = bufferWindow; } + if (version) { + result["version"] = (long long)version; + } result["moreheader"] = moreheader; return result; } + ///\brief Writes metadata to a filename. Wipes existing contents, if any. + bool Meta::toFile(const std::string & fileName){ + std::ofstream oFile(fileName.c_str()); + oFile << toJSON().toNetPacked(); + oFile.close(); + } + ///\brief Converts a meta object to a human readable string ///\param str The stringstream to append to ///\param indent the amount of indentation needed diff --git a/lib/procs.h b/lib/procs.h index b74ed258..2c07cda8 100644 --- a/lib/procs.h +++ b/lib/procs.h @@ -17,17 +17,17 @@ namespace Util { private: static bool childRunning(pid_t p); static tthread::mutex plistMutex; - static tthread::thread * reaper_thread; static std::set plist; ///< Holds active process list. - static bool handler_set; ///< If true, the sigchld handler has been setup. static bool thread_handler;///< True while thread handler should be running. static void childsig_handler(int signum); static void exit_handler(); static void runCmd(std::string & cmd); - static void setHandler(); static char* const* dequeToArgv(std::deque & argDeq); static void grim_reaper(void * n); public: + static tthread::thread * reaper_thread; + static bool handler_set; ///< If true, the sigchld handler has been setup. + static void setHandler(); static std::string getOutputOf(char * const * argv); static std::string getOutputOf(std::deque & argDeq); static pid_t StartPiped(char * const * argv, int * fdin, int * fdout, int * fderr); diff --git a/lib/rtmpchunks.cpp b/lib/rtmpchunks.cpp index ce83ccdc..c1d9582f 100644 --- a/lib/rtmpchunks.cpp +++ b/lib/rtmpchunks.cpp @@ -537,9 +537,14 @@ bool RTMPStream::doHandshake() { Secure::hmac_sha256bin(pTempBuffer, 1504, genuineFMSKey, 36, (char*)Server + serverDigestOffset); //SECOND 1536 bytes for server response - char pTempHash[32]; - Secure::hmac_sha256bin((char*)Client + keyChallengeIndex, 32, genuineFMSKey, 68, pTempHash); - Secure::hmac_sha256bin((char*)Server + 1536, 1536 - 32, pTempHash, 32, (char*)Server + 1536 * 2 - 32); + if (_validationScheme == 5 && Version == 3){ + //copy exactly from client + memcpy(Server+1536, Client, 1536); + }else{ + char pTempHash[32]; + Secure::hmac_sha256bin((char*)Client + keyChallengeIndex, 32, genuineFMSKey, 68, pTempHash); + Secure::hmac_sha256bin((char*)Server + 1536, 1536 - 32, pTempHash, 32, (char*)Server + 1536 * 2 - 32); + } Server[ -1] = Version; RTMPStream::snd_cnt += 3073; diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index f78275db..e0b8dcf8 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -22,6 +22,11 @@ #endif +/// Forces a disconnect to all users. +static void killStatistics(char * data, size_t len, unsigned int id){ + (*(data - 1)) = 60 | ((*(data - 1))&0x80);//Send disconnect message; +} + namespace IPC { #if defined(__CYGWIN__) || defined(_WIN32) @@ -776,7 +781,6 @@ namespace IPC { ///\brief The deconstructor sharedServer::~sharedServer() { - finishEach(); mySemaphore.close(); mySemaphore.unlink(); } @@ -833,21 +837,16 @@ namespace IPC { return false; } - ///Disconnect all connected users + ///Disconnect all connected users, waits at most 2.5 seconds until completed void sharedServer::finishEach(){ if (!hasCounter){ return; } - for (std::set::iterator it = myPages.begin(); it != myPages.end(); it++) { - if (!it->mapped || !it->len) { - break; - } - unsigned int offset = 0; - while (offset + payLen + (hasCounter ? 1 : 0) <= it->len) { - it->mapped[offset] = 126; - offset += payLen + (hasCounter ? 1 : 0); - } - } + unsigned int c = 0;//to prevent eternal loops + do{ + parseEach(killStatistics); + Util::wait(250); + }while(amount && c++ < 10); } ///Returns a pointer to the data for the given index. @@ -917,17 +916,18 @@ namespace IPC { if (*counter & 0x80){ connectedUsers++; } + char countNum = (*counter) & 0x7F; if (id >= amount) { amount = id + 1; VERYHIGH_MSG("Shared memory %s is now at count %u", baseName.c_str(), amount); } uint32_t tmpPID = *((uint32_t *)(it->mapped + 1 + offset + payLen - 4)); - if (!Util::Procs::isRunning(tmpPID) && !(*counter == 126 || *counter == 127)){ + if (!Util::Procs::isRunning(tmpPID) && !(countNum == 126 || countNum == 127)){ WARN_MSG("process disappeared, timing out. (pid %lu)", tmpPID); - *counter = 126; //if process is already dead, instant timeout. + *counter = 125 | (0x80 & (*counter)); //if process is already dead, instant timeout. } callback(it->mapped + offset + 1, payLen, id); - switch (*counter) { + switch (countNum) { case 127: HIGH_MSG("Client %u requested disconnect", id); break; @@ -937,9 +937,9 @@ namespace IPC { default: #ifndef NOCRASHCHECK if (tmpPID) { - if (*counter > 10 && *counter < 126) { - if (*counter < 30) { - if (*counter > 15) { + if (countNum > 10 && countNum < 60) { + if (countNum < 30) { + if (countNum > 15) { WARN_MSG("Process %d is unresponsive", tmpPID); } Util::Procs::Stop(tmpPID); //soft kill @@ -948,11 +948,22 @@ namespace IPC { Util::Procs::Murder(tmpPID); //improved kill } } + if (countNum > 70) { + if (countNum < 90) { + if (countNum > 75) { + WARN_MSG("Stopping process %d is unresponsive", tmpPID); + } + Util::Procs::Stop(tmpPID); //soft kill + } else { + ERROR_MSG("Killing unresponsive stopping process %d", tmpPID); + Util::Procs::Murder(tmpPID); //improved kill + } + } } #endif break; } - if (*counter == 127 || *counter == 126){ + if (countNum == 127 || countNum == 126){ memset(it->mapped + offset + 1, 0, payLen); it->mapped[offset] = 0; } else { @@ -1153,7 +1164,7 @@ namespace IPC { } if (myPage.mapped) { semGuard tmpGuard(&mySemaphore); - myPage.mapped[offsetOnPage] = 126; + myPage.mapped[offsetOnPage] = 126 | (countAsViewer?0x80:0); HIGH_MSG("sharedClient finished ID %d", offsetOnPage/(payLen+1)); } } @@ -1164,16 +1175,19 @@ namespace IPC { DEBUG_MSG(DLVL_WARN, "Trying to keep-alive an element without counters"); return; } - if ((myPage.mapped[offsetOnPage] & 0x7F) < 126) { + if (isAlive()){ myPage.mapped[offsetOnPage] = (countAsViewer ? 0x81 : 0x01); } } bool sharedClient::isAlive() { if (!hasCounter) { - return true; + return (myPage.mapped != 0); } - return (myPage.mapped[offsetOnPage] & 0x7F) < 126; + if (myPage.mapped){ + return (myPage.mapped[offsetOnPage] & 0x7F) < 60; + } + return false; } ///\brief Get a pointer to the data of this client diff --git a/lib/shared_memory.h b/lib/shared_memory.h index bc8906f2..920ca1b8 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -186,6 +186,7 @@ namespace IPC { ///\brief The amount of connected clients unsigned int amount; unsigned int connectedUsers; + void finishEach(); private: bool isInUse(unsigned int id); void newPage(); @@ -200,7 +201,6 @@ namespace IPC { semaphore mySemaphore; ///\brief Whether the payload has a counter, if so, it is added in front of the payload bool hasCounter; - void finishEach(); }; ///\brief The client part of a server/client model for shared memory. diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index 8ac9513f..1f1c5569 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -95,6 +96,7 @@ void createAccount (std::string account){ } } + /// Status monitoring thread. /// Will check outputs, inputs and converters every five seconds void statusMonitor(void * np){ @@ -138,15 +140,14 @@ void statusMonitor(void * np){ configLock.unlink(); } -///\brief The main entry point for the controller. +///\brief The main loop for the controller. /// /// \triggers /// The `"SYSTEM_STOP"` trigger is global, and is ran when the controller shuts down. If cancelled, the controller does not shut down and will attempt to re-open the API socket. Its payload is: /// ~~~~~~~~~~~~~~~ /// shutdown reason /// ~~~~~~~~~~~~~~~ -int main(int argc, char ** argv){ - +int main_loop(int argc, char ** argv){ Controller::Storage = JSON::fromFile("config.json"); JSON::Value stored_port = JSON::fromString("{\"long\":\"port\", \"short\":\"p\", \"arg\":\"integer\", \"help\":\"TCP port to listen on.\"}"); stored_port["default"] = Controller::Storage["config"]["controller"]["port"]; @@ -163,7 +164,6 @@ int main(int argc, char ** argv){ if ( !stored_user["default"]){ stored_user["default"] = "root"; } - Controller::conf = Util::Config(argv[0]); Controller::conf.addOption("port", stored_port); Controller::conf.addOption("interface", stored_interface); Controller::conf.addOption("username", stored_user); @@ -205,6 +205,7 @@ int main(int argc, char ** argv){ if (pipe(pipeErr) >= 0){ dup2(pipeErr[1], STDERR_FILENO);//cause stderr to write to the pipe close(pipeErr[1]);//close the unneeded pipe file descriptor + Util::Procs::socketList.insert(pipeErr[0]); tthread::thread msghandler(Controller::handleMsg, (void*)(((char*)0) + pipeErr[0])); msghandler.detach(); } @@ -262,7 +263,7 @@ int main(int argc, char ** argv){ } }else if(yna(in_string) == 'a'){ //abort controller startup - return 0; + return 1; } } } @@ -284,7 +285,7 @@ int main(int argc, char ** argv){ } }else if(yna(in_string) == 'a'){ //abort controller startup - return 0; + return 1; } } } @@ -334,10 +335,10 @@ int main(int argc, char ** argv){ }else{ shutdown_reason = "socket problem (API port closed)"; } - /*LTS-START*/ if (Controller::restarting){ - shutdown_reason = "update (on request)"; + shutdown_reason = "restart (on request)"; } + /*LTS-START*/ if(Triggers::shouldTrigger("SYSTEM_STOP")){ if (!Triggers::doTrigger("SYSTEM_STOP", shutdown_reason)){ Controller::conf.is_active = true; @@ -348,8 +349,10 @@ int main(int argc, char ** argv){ Controller::Log("CONF", "Controller shutting down because of "+shutdown_reason); } }else{ + /*LTS-END*/ Controller::conf.is_active = false; Controller::Log("CONF", "Controller shutting down because of "+shutdown_reason); + /*LTS-START*/ } }//indentation intentionally wrong, to minimize Pro/nonPro diffs /*LTS-END*/ @@ -374,16 +377,85 @@ int main(int argc, char ** argv){ Util::Procs::StopAll(); //give everything some time to print messages Util::wait(100); + std::cout << "Killed all processes, wrote config to disk. Exiting." << std::endl; + if (Controller::restarting){ + return 42; + } //close stderr to make the stderr reading thread exit close(STDERR_FILENO); - std::cout << "Killed all processes, wrote config to disk. Exiting." << std::endl; - /*LTS-START*/ - if (Controller::restarting){ - std::string myFile = Util::getMyPath() + "MistController"; - execvp(myFile.c_str(), argv); - std::cout << "Error restarting: " << strerror(errno) << std::endl; - } - /*LTS-END*/ + return 0; +} + +void handleUSR1(int signum, siginfo_t * sigInfo, void * ignore){ + Controller::Log("CONF", "USR1 received - restarting controller"); + Controller::restarting = true; + raise(SIGINT); //trigger restart +} + +///\brief The controller angel process. +///Starts a forked main_loop in a loop. Yes, you read that right. +int main(int argc, char ** argv){ + Util::Procs::setHandler();//set child handler + { + struct sigaction new_action; + struct sigaction cur_action; + new_action.sa_sigaction = handleUSR1; + sigemptyset(&new_action.sa_mask); + new_action.sa_flags = 0; + sigaction(SIGUSR1, &new_action, NULL); + } + + Controller::conf = Util::Config(argv[0]); + Controller::conf.activate(); + uint64_t reTimer = 0; + while (Controller::conf.is_active){ + pid_t pid = fork(); + if (pid == 0){ + Util::Procs::handler_set = false; + Util::Procs::reaper_thread = 0; + { + struct sigaction new_action; + struct sigaction cur_action; + new_action.sa_sigaction = handleUSR1; + sigemptyset(&new_action.sa_mask); + new_action.sa_flags = 0; + sigaction(SIGUSR1, &new_action, NULL); + } + return main_loop(argc, argv); + } + if (pid == -1){ + FAIL_MSG("Unable to spawn controller process!"); + return 2; + } + //wait for the process to exit + int status; + while (waitpid(pid, &status, 0) != pid && errno == EINTR){ + if (Controller::restarting){ + Controller::conf.is_active = true; + Controller::restarting = false; + kill(pid, SIGUSR1); + } + if (!Controller::conf.is_active){ + INFO_MSG("Shutting down controller because of signal interrupt..."); + Util::Procs::Stop(pid); + } + continue; + } + //if the exit was clean, don't restart it + if (WIFEXITED(status) && (WEXITSTATUS(status) == 0)){ + MEDIUM_MSG("Controller shut down cleanly"); + break; + } + if (WIFEXITED(status) && (WEXITSTATUS(status) == 42)){ + WARN_MSG("Refreshing angel process for update"); + std::string myFile = Util::getMyPath() + "MistController"; + execvp(myFile.c_str(), argv); + FAIL_MSG("Error restarting: %s", strerror(errno)); + } + INFO_MSG("Controller uncleanly shut down! Restarting in %llu...", reTimer); + Util::wait(reTimer); + reTimer += 1000; + } return 0; } diff --git a/src/controller/controller_storage.cpp b/src/controller/controller_storage.cpp index 26e0cbe4..a245f4cf 100644 --- a/src/controller/controller_storage.cpp +++ b/src/controller/controller_storage.cpp @@ -18,6 +18,7 @@ namespace Controller { tthread::mutex logMutex; unsigned long long logCounter = 0; bool configChanged = false; + bool restarting = false; ///\brief Store and print a log message. ///\param kind The type of message. diff --git a/src/controller/controller_storage.h b/src/controller/controller_storage.h index 00e28174..37dc1782 100644 --- a/src/controller/controller_storage.h +++ b/src/controller/controller_storage.h @@ -9,6 +9,7 @@ namespace Controller { extern tthread::mutex logMutex;///< Mutex for log thread. extern tthread::mutex configMutex;///< Mutex for server config access. extern bool configChanged; ///< Bool that indicates config must be written to SHM. + extern bool restarting;///< Signals if the controller is shutting down (false) or restarting (true). extern unsigned long long logCounter; ///getString("input") + ".dtsh"); + if (!tmpdtsh){ + return false; + } + if (tmpdtsh.getMeta().version != DTSH_VERSION){ + INFO_MSG("Updating wrong version header file from version %llu to %llu", tmpdtsh.getMeta().version, DTSH_VERSION); + return false; + } + myMeta = tmpdtsh.getMeta(); + return true; + } + } diff --git a/src/input/input.h b/src/input/input.h index e46a6bdd..8e0c3887 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -29,6 +29,7 @@ namespace Mist { static void callbackWrapper(char * data, size_t len, unsigned int id); virtual bool setup() = 0; virtual bool readHeader() = 0; + virtual bool readExistingHeader(); virtual bool atKeyFrame(); virtual void getNext(bool smart = true) {}; virtual void seek(int seekTime){}; diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index bef15894..9f3a3359 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -526,6 +526,7 @@ namespace Mist { } else if (everHadPush && !resumeMode && config->is_active) { INFO_MSG("Shutting down buffer because resume mode is disabled and the source disconnected"); config->is_active = false; + userPage.finishEach(); } } diff --git a/src/input/input_flv.cpp b/src/input/input_flv.cpp index 66aa218b..53412c9c 100644 --- a/src/input/input_flv.cpp +++ b/src/input/input_flv.cpp @@ -49,23 +49,13 @@ namespace Mist { } bool inputFLV::readHeader() { - JSON::Value lastPack; - if (!inFile) { - return false; - } + if (!inFile){return false;} //See whether a separate header file exists. - DTSC::File tmp(config->getString("input") + ".dtsh"); - if (tmp){ - myMeta = tmp.getMeta(); - if (myMeta){ - return true; - }else{ - myMeta = DTSC::Meta(); - } - } + if (readExistingHeader()){return true;} //Create header file from FLV data fseek(inFile, 13, SEEK_SET); AMF::Object amf_storage; + JSON::Value lastPack; long long int lastBytePos = 13; while (!feof(inFile) && !FLV::Parse_Error){ if (tmpTag.FileLoader(inFile)){ @@ -80,9 +70,7 @@ namespace Mist { std::cerr << FLV::Error_Str << std::endl; return false; } - std::ofstream oFile(std::string(config->getString("input") + ".dtsh").c_str()); - oFile << myMeta.toJSON().toNetPacked(); - oFile.close(); + myMeta.toFile(config->getString("input") + ".dtsh"); return true; } diff --git a/src/input/input_mp3.cpp b/src/input/input_mp3.cpp index 7cf0b21d..3307923c 100644 --- a/src/input/input_mp3.cpp +++ b/src/input/input_mp3.cpp @@ -47,17 +47,9 @@ namespace Mist { } bool inputMP3::readHeader() { - if (!inFile) { - return false; - } + if (!inFile){return false;} //See whether a separate header file exists. - DTSC::File tmp(config->getString("input") + ".dtsh"); - if (tmp){ - myMeta = tmp.getMeta(); - if (myMeta){ - return true; - } - } + if (readExistingHeader()){return true;} myMeta = DTSC::Meta(); myMeta.tracks[1].trackID = 1; myMeta.tracks[1].type = "audio"; @@ -93,9 +85,7 @@ namespace Mist { fseek(inFile, 0, SEEK_SET); timestamp = 0; - std::ofstream oFile(std::string(config->getString("input") + ".dtsh").c_str()); - oFile << myMeta.toJSON().toNetPacked(); - oFile.close(); + myMeta.toFile(config->getString("input") + ".dtsh"); return true; } diff --git a/src/input/input_ogg.cpp b/src/input/input_ogg.cpp index 06e20f6a..c5a95237 100644 --- a/src/input/input_ogg.cpp +++ b/src/input/input_ogg.cpp @@ -222,11 +222,7 @@ namespace Mist { getNext(); } - std::ofstream oFile(std::string(config->getString("input") + ".dtsh").c_str()); - oFile << myMeta.toJSON().toNetPacked(); - oFile.close(); - - //myMeta.toPrettyString(std::cout); + myMeta.toFile(config->getString("input") + ".dtsh"); return true; } diff --git a/src/output/output.cpp b/src/output/output.cpp index 5cfc3a89..19f10d94 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -73,6 +73,10 @@ namespace Mist { } void Output::updateMeta(){ + //cancel if not alive + if (!nProxy.userClient.isAlive()){ + return; + } //read metadata from page to myMeta variable if (nProxy.metaPages[0].mapped){ IPC::semaphore * liveSem = 0; @@ -249,22 +253,30 @@ namespace Mist { statsPage.finish(); myConn.resetCounter(); } - statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true); if (nProxy.userClient.getData()){ nProxy.userClient.finish(); } char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); - nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); + unsigned int attempts = 0; + while (!nProxy.userClient.isAlive() && ++attempts < 20 && Util::streamAlive(streamName)){ + nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); + } + if (!nProxy.userClient.isAlive()){ + FAIL_MSG("Could not register as client for %s", streamName.c_str()); + onFail(); + return; + } char pageId[NAME_BUFFER_SIZE]; snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str()); nProxy.metaPages.clear(); nProxy.metaPages[0].init(pageId, DEFAULT_STRM_PAGE_SIZE); if (!nProxy.metaPages[0].mapped){ - FAIL_MSG("Could not connect to server for %s", streamName.c_str()); + FAIL_MSG("Could not connect to data for %s", streamName.c_str()); onFail(); return; } + statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true); stats(true); updateMeta(); selectDefaultTracks(); @@ -490,8 +502,12 @@ namespace Mist { } void Output::loadPageForKey(long unsigned int trackId, long long int keyNum){ + if (!myMeta.tracks.count(trackId) || !myMeta.tracks[trackId].keys.size()){ + WARN_MSG("Load for track %lu key %lld aborted - track is empty", trackId, keyNum); + return; + } if (myMeta.vod && keyNum > myMeta.tracks[trackId].keys.rbegin()->getNumber()){ - INFO_MSG("Seek in track %lu to key %lld aborted, is > %lld", trackId, keyNum, myMeta.tracks[trackId].keys.rbegin()->getNumber()); + INFO_MSG("Load for track %lu key %lld aborted, is > %lld", trackId, keyNum, myMeta.tracks[trackId].keys.rbegin()->getNumber()); nProxy.curPage.erase(trackId); currKeyOpen.erase(trackId); return; @@ -1229,25 +1245,28 @@ namespace Mist { //check where the next key is nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime()); int nextPage = pageNumForKey(nxt.tid, nxtKeyNum[nxt.tid]+1); - //are we live, and the next key hasn't shown up on another page, then we're waiting. - if (myMeta.live && currKeyOpen.count(nxt.tid) && (currKeyOpen[nxt.tid] == (unsigned int)nextPage || nextPage == -1)){ + //if the next key hasn't shown up on another page, then we're waiting. + //VoD might be slow, so we check VoD case also, just in case + if (currKeyOpen.count(nxt.tid) && (currKeyOpen[nxt.tid] == (unsigned int)nextPage || nextPage == -1)){ if (++emptyCount < 100){ Util::wait(250); //we're waiting for new data to show up if (emptyCount % 64 == 0){ reconnect();//reconnect every 16 seconds }else{ - if (emptyCount % 4 == 0){ + //updating meta is only useful with live streams + if (myMeta.live && emptyCount % 4 == 0){ updateMeta(); } } }else{ //after ~25 seconds, give up and drop the track. - dropTrack(nxt.tid, "EOP: could not reload empty packet"); + dropTrack(nxt.tid, "EOP: data wait timeout"); } return false; } + //The next key showed up on another page! //We've simply reached the end of the page. Load the next key = next page. loadPageForKey(nxt.tid, ++nxtKeyNum[nxt.tid]); nxt.offset = 0; @@ -1297,6 +1316,10 @@ namespace Mist { //when live, every keyframe, check correctness of the keyframe number if (myMeta.live && thisPacket.getFlag("keyframe")){ + //cancel if not alive + if (!nProxy.userClient.isAlive()){ + return false; + } //Check whether returned keyframe is correct. If not, wait for approximately 10 seconds while checking. //Failure here will cause tracks to drop due to inconsistent internal state. nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime()); diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index db847249..18e36d5f 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -586,6 +586,16 @@ namespace Mist { void OutRTMP::parseAMFCommand(AMF::Object & amfData, int messageType, int streamId) { MEDIUM_MSG("Received command: %s", amfData.Print().c_str()); HIGH_MSG("AMF0 command: %s", amfData.getContentP(0)->StrValue().c_str()); + if (amfData.getContentP(0)->StrValue() == "xsbwtest") { + //send a _result reply + AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); + amfReply.addContent(AMF::Object("", "_error")); //result success + amfReply.addContent(amfData.getContent(1)); //same transaction ID + amfReply.addContent(AMF::Object("", amfData.getContentP(0)->StrValue())); //null - command info + amfReply.addContent(AMF::Object("", "Hai XSplit user!")); //stream ID? + sendCommand(amfReply, messageType, streamId); + return; + } if (amfData.getContentP(0)->StrValue() == "connect") { double objencoding = 0; if (amfData.getContentP(2)->getContentP("objectEncoding")) {