From 357eb4e722961cf15054f13b4c896e322982e26d Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 6 Sep 2016 10:24:11 +0200 Subject: [PATCH 01/10] Fixed strict compliance V3 RTMP clients (e.g. Mishira) --- lib/rtmpchunks.cpp | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/rtmpchunks.cpp b/lib/rtmpchunks.cpp index ce83ccdc..c1d9582f 100644 --- a/lib/rtmpchunks.cpp +++ b/lib/rtmpchunks.cpp @@ -537,9 +537,14 @@ bool RTMPStream::doHandshake() { Secure::hmac_sha256bin(pTempBuffer, 1504, genuineFMSKey, 36, (char*)Server + serverDigestOffset); //SECOND 1536 bytes for server response - char pTempHash[32]; - Secure::hmac_sha256bin((char*)Client + keyChallengeIndex, 32, genuineFMSKey, 68, pTempHash); - Secure::hmac_sha256bin((char*)Server + 1536, 1536 - 32, pTempHash, 32, (char*)Server + 1536 * 2 - 32); + if (_validationScheme == 5 && Version == 3){ + //copy exactly from client + memcpy(Server+1536, Client, 1536); + }else{ + char pTempHash[32]; + Secure::hmac_sha256bin((char*)Client + keyChallengeIndex, 32, genuineFMSKey, 68, pTempHash); + Secure::hmac_sha256bin((char*)Server + 1536, 1536 - 32, pTempHash, 32, (char*)Server + 1536 * 2 - 32); + } Server[ -1] = Version; RTMPStream::snd_cnt += 3073; From a4f35ca11e0612788699d2a49570d88be729e883 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 6 Sep 2016 11:53:40 +0200 Subject: [PATCH 02/10] Made sharedServer::finishEach explicit --- lib/shared_memory.cpp | 23 +++++++++++------------ lib/shared_memory.h | 2 +- src/input/input_buffer.cpp | 1 + 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index d0041fd5..d0e61ec5 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -22,6 +22,11 @@ #endif +/// Forces a disconnect to all users. +static void killStatistics(char * data, size_t len, unsigned int id){ + (*(data - 1)) = 126;//Send disconnect message; +} + namespace IPC { #if defined(__CYGWIN__) || defined(_WIN32) @@ -776,7 +781,6 @@ namespace IPC { ///\brief The deconstructor sharedServer::~sharedServer() { - finishEach(); mySemaphore.close(); mySemaphore.unlink(); } @@ -833,21 +837,16 @@ namespace IPC { return false; } - ///Disconnect all connected users + ///Disconnect all connected users, waits at most 2.5 seconds until completed void sharedServer::finishEach(){ if (!hasCounter){ return; } - for (std::set::iterator it = myPages.begin(); it != myPages.end(); it++) { - if (!it->mapped || !it->len) { - break; - } - unsigned int offset = 0; - while (offset + payLen + (hasCounter ? 1 : 0) <= it->len) { - it->mapped[offset] = 126; - offset += payLen + (hasCounter ? 1 : 0); - } - } + unsigned int c = 0;//to prevent eternal loops + do{ + parseEach(killStatistics); + Util::wait(250); + }while(amount && c++ < 10); } ///Returns a pointer to the data for the given index. diff --git a/lib/shared_memory.h b/lib/shared_memory.h index bc8906f2..920ca1b8 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -186,6 +186,7 @@ namespace IPC { ///\brief The amount of connected clients unsigned int amount; unsigned int connectedUsers; + void finishEach(); private: bool isInUse(unsigned int id); void newPage(); @@ -200,7 +201,6 @@ namespace IPC { semaphore mySemaphore; ///\brief Whether the payload has a counter, if so, it is added in front of the payload bool hasCounter; - void finishEach(); }; ///\brief The client part of a server/client model for shared memory. diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index e16b7b0d..76095fb2 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -396,6 +396,7 @@ namespace Mist { } else if (everHadPush && !resumeMode && config->is_active) { INFO_MSG("Shutting down buffer because resume mode is disabled and the source disconnected"); config->is_active = false; + userPage.finishEach(); } } From bd4c95148897f00cb89ce1319c9d165f05de6929 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 6 Sep 2016 11:53:55 +0200 Subject: [PATCH 03/10] Fixed segfault in VoD outputs with corrupted headers --- src/output/output.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/output/output.cpp b/src/output/output.cpp index 517a3801..dc9ad494 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -404,8 +404,12 @@ namespace Mist { } void Output::loadPageForKey(long unsigned int trackId, long long int keyNum){ + if (!myMeta.tracks.count(trackId) || !myMeta.tracks[trackId].keys.size()){ + WARN_MSG("Load for track %lu key %lld aborted - track is empty", trackId, keyNum); + return; + } if (myMeta.vod && keyNum > myMeta.tracks[trackId].keys.rbegin()->getNumber()){ - INFO_MSG("Seek in track %lu to key %lld aborted, is > %lld", trackId, keyNum, myMeta.tracks[trackId].keys.rbegin()->getNumber()); + INFO_MSG("Load for track %lu key %lld aborted, is > %lld", trackId, keyNum, myMeta.tracks[trackId].keys.rbegin()->getNumber()); nProxy.curPage.erase(trackId); currKeyOpen.erase(trackId); return; From ceafaa57e60d243ae80a41e5ad8416f49c85496a Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 15 Sep 2016 15:51:40 +0200 Subject: [PATCH 04/10] Added angel process --- lib/config.cpp | 2 +- lib/procs.h | 6 +- src/controller/controller.cpp | 94 +++++++++++++++++++++++++-- src/controller/controller_storage.cpp | 1 + src/controller/controller_storage.h | 1 + 5 files changed, 93 insertions(+), 11 deletions(-) diff --git a/lib/config.cpp b/lib/config.cpp index 304dc297..d176b353 100644 --- a/lib/config.cpp +++ b/lib/config.cpp @@ -210,7 +210,7 @@ bool Util::Config::parseArgs(int & argc, char ** & argv) { std::cout << "- Flag: Big metadata. Enabled longer live stream durations. Breaks compatibility with DTSH files generated by versions without this flag." << std::endl; #endif std::cout << "Built on " __DATE__ ", " __TIME__ << std::endl; - exit(1); + exit(0); break; default: jsonForEach(vals, it) { diff --git a/lib/procs.h b/lib/procs.h index b74ed258..2c07cda8 100644 --- a/lib/procs.h +++ b/lib/procs.h @@ -17,17 +17,17 @@ namespace Util { private: static bool childRunning(pid_t p); static tthread::mutex plistMutex; - static tthread::thread * reaper_thread; static std::set plist; ///< Holds active process list. - static bool handler_set; ///< If true, the sigchld handler has been setup. static bool thread_handler;///< True while thread handler should be running. static void childsig_handler(int signum); static void exit_handler(); static void runCmd(std::string & cmd); - static void setHandler(); static char* const* dequeToArgv(std::deque & argDeq); static void grim_reaper(void * n); public: + static tthread::thread * reaper_thread; + static bool handler_set; ///< If true, the sigchld handler has been setup. + static void setHandler(); static std::string getOutputOf(char * const * argv); static std::string getOutputOf(std::deque & argDeq); static pid_t StartPiped(char * const * argv, int * fdin, int * fdout, int * fderr); diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index 67f3af71..7791a26d 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -88,6 +89,7 @@ void createAccount (std::string account){ } } + /// Status monitoring thread. /// Will check outputs, inputs and converters every five seconds void statusMonitor(void * np){ @@ -119,9 +121,8 @@ void statusMonitor(void * np){ configLock.unlink(); } -///\brief The main entry point for the controller. -int main(int argc, char ** argv){ - +///\brief The main loop for the controller. +int main_loop(int argc, char ** argv){ Controller::Storage = JSON::fromFile("config.json"); JSON::Value stored_port = JSON::fromString("{\"long\":\"port\", \"short\":\"p\", \"arg\":\"integer\", \"help\":\"TCP port to listen on.\"}"); stored_port["default"] = Controller::Storage["config"]["controller"]["port"]; @@ -138,7 +139,6 @@ int main(int argc, char ** argv){ if ( !stored_user["default"]){ stored_user["default"] = "root"; } - Controller::conf = Util::Config(argv[0]); Controller::conf.addOption("port", stored_port); Controller::conf.addOption("interface", stored_interface); Controller::conf.addOption("username", stored_user); @@ -172,6 +172,7 @@ int main(int argc, char ** argv){ if (pipe(pipeErr) >= 0){ dup2(pipeErr[1], STDERR_FILENO);//cause stderr to write to the pipe close(pipeErr[1]);//close the unneeded pipe file descriptor + Util::Procs::socketList.insert(pipeErr[0]); tthread::thread msghandler(Controller::handleMsg, (void*)(((char*)0) + pipeErr[0])); msghandler.detach(); } @@ -224,7 +225,7 @@ int main(int argc, char ** argv){ } }else if(yna(in_string) == 'a'){ //abort controller startup - return 0; + return 1; } } } @@ -246,7 +247,7 @@ int main(int argc, char ** argv){ } }else if(yna(in_string) == 'a'){ //abort controller startup - return 0; + return 1; } } } @@ -283,6 +284,9 @@ int main(int argc, char ** argv){ }else{ shutdown_reason = "socket problem (API port closed)"; } + if (Controller::restarting){ + shutdown_reason = "restart (on request)"; + } Controller::conf.is_active = false; Controller::Log("CONF", "Controller shutting down because of "+shutdown_reason); //join all joinable threads @@ -304,9 +308,85 @@ int main(int argc, char ** argv){ Util::Procs::StopAll(); //give everything some time to print messages Util::wait(100); + std::cout << "Killed all processes, wrote config to disk. Exiting." << std::endl; + if (Controller::restarting){ + return 42; + } //close stderr to make the stderr reading thread exit close(STDERR_FILENO); - std::cout << "Killed all processes, wrote config to disk. Exiting." << std::endl; + return 0; +} + +void handleUSR1(int signum, siginfo_t * sigInfo, void * ignore){ + Controller::Log("CONF", "USR1 received - restarting controller"); + Controller::restarting = true; + raise(SIGINT); //trigger restart +} + +///\brief The controller angel process. +///Starts a forked main_loop in a loop. Yes, you read that right. +int main(int argc, char ** argv){ + Util::Procs::setHandler();//set child handler + { + struct sigaction new_action; + struct sigaction cur_action; + new_action.sa_sigaction = handleUSR1; + sigemptyset(&new_action.sa_mask); + new_action.sa_flags = 0; + sigaction(SIGUSR1, &new_action, NULL); + } + + Controller::conf = Util::Config(argv[0]); + Controller::conf.activate(); + uint64_t reTimer = 0; + while (Controller::conf.is_active){ + pid_t pid = fork(); + if (pid == 0){ + Util::Procs::handler_set = false; + Util::Procs::reaper_thread = 0; + { + struct sigaction new_action; + struct sigaction cur_action; + new_action.sa_sigaction = handleUSR1; + sigemptyset(&new_action.sa_mask); + new_action.sa_flags = 0; + sigaction(SIGUSR1, &new_action, NULL); + } + return main_loop(argc, argv); + } + if (pid == -1){ + FAIL_MSG("Unable to spawn controller process!"); + return 2; + } + //wait for the process to exit + int status; + while (waitpid(pid, &status, 0) != pid && errno == EINTR){ + if (Controller::restarting){ + Controller::conf.is_active = true; + Controller::restarting = false; + kill(pid, SIGUSR1); + } + if (!Controller::conf.is_active){ + INFO_MSG("Shutting down controller because of signal interrupt..."); + Util::Procs::Stop(pid); + } + continue; + } + //if the exit was clean, don't restart it + if (WIFEXITED(status) && (WEXITSTATUS(status) == 0)){ + MEDIUM_MSG("Controller shut down cleanly"); + break; + } + if (WIFEXITED(status) && (WEXITSTATUS(status) == 42)){ + WARN_MSG("Refreshing angel process for update"); + std::string myFile = Util::getMyPath() + "MistController"; + execvp(myFile.c_str(), argv); + FAIL_MSG("Error restarting: %s", strerror(errno)); + } + INFO_MSG("Controller uncleanly shut down! Restarting in %llu...", reTimer); + Util::wait(reTimer); + reTimer += 1000; + } return 0; } diff --git a/src/controller/controller_storage.cpp b/src/controller/controller_storage.cpp index 35e54e6f..3eef516f 100644 --- a/src/controller/controller_storage.cpp +++ b/src/controller/controller_storage.cpp @@ -16,6 +16,7 @@ namespace Controller { tthread::mutex configMutex; tthread::mutex logMutex; bool configChanged = false; + bool restarting = false; ///\brief Store and print a log message. ///\param kind The type of message. diff --git a/src/controller/controller_storage.h b/src/controller/controller_storage.h index bd65fff6..0adad412 100644 --- a/src/controller/controller_storage.h +++ b/src/controller/controller_storage.h @@ -9,6 +9,7 @@ namespace Controller { extern tthread::mutex logMutex;///< Mutex for log thread. extern tthread::mutex configMutex;///< Mutex for server config access. extern bool configChanged; ///< Bool that indicates config must be written to SHM. + extern bool restarting;///< Signals if the controller is shutting down (false) or restarting (true). /// Store and print a log message. void Log(std::string kind, std::string message); From 4472d00e69a11ced531bb180369ed9f5fd2a7fc5 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 15 Sep 2016 15:53:23 +0200 Subject: [PATCH 05/10] Improved sharedClient isAlive handling --- lib/shared_memory.cpp | 40 ++++++++++++++++++++++++++++------------ src/output/output.cpp | 8 ++++++++ 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index d0e61ec5..49030a22 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -24,7 +24,7 @@ /// Forces a disconnect to all users. static void killStatistics(char * data, size_t len, unsigned int id){ - (*(data - 1)) = 126;//Send disconnect message; + (*(data - 1)) = 60 | ((*(data - 1))&0x80);//Send disconnect message; } namespace IPC { @@ -916,17 +916,18 @@ namespace IPC { if (*counter & 0x80){ connectedUsers++; } + char countNum = (*counter) & 0x7F; if (id >= amount) { amount = id + 1; VERYHIGH_MSG("Shared memory %s is now at count %u", baseName.c_str(), amount); } uint32_t tmpPID = *((uint32_t *)(it->mapped + 1 + offset + payLen - 4)); - if (!Util::Procs::isRunning(tmpPID) && !(*counter == 126 || *counter == 127)){ + if (!Util::Procs::isRunning(tmpPID) && !(countNum == 126 || countNum == 127)){ WARN_MSG("process disappeared, timing out. (pid %lu)", tmpPID); - *counter = 126; //if process is already dead, instant timeout. + *counter = 125 | (0x80 & (*counter)); //if process is already dead, instant timeout. } callback(it->mapped + offset + 1, payLen, id); - switch (*counter) { + switch (countNum) { case 127: HIGH_MSG("Client %u requested disconnect", id); break; @@ -936,9 +937,9 @@ namespace IPC { default: #ifndef NOCRASHCHECK if (tmpPID) { - if (*counter > 10 && *counter < 126) { - if (*counter < 30) { - if (*counter > 15) { + if (countNum > 10 && countNum < 60) { + if (countNum < 30) { + if (countNum > 15) { WARN_MSG("Process %d is unresponsive", tmpPID); } Util::Procs::Stop(tmpPID); //soft kill @@ -947,11 +948,22 @@ namespace IPC { Util::Procs::Murder(tmpPID); //improved kill } } + if (countNum > 70) { + if (countNum < 90) { + if (countNum > 75) { + WARN_MSG("Stopping process %d is unresponsive", tmpPID); + } + Util::Procs::Stop(tmpPID); //soft kill + } else { + ERROR_MSG("Killing unresponsive stopping process %d", tmpPID); + Util::Procs::Murder(tmpPID); //improved kill + } + } } #endif break; } - if (*counter == 127 || *counter == 126){ + if (countNum == 127 || countNum == 126){ memset(it->mapped + offset + 1, 0, payLen); it->mapped[offset] = 0; } else { @@ -1153,7 +1165,8 @@ namespace IPC { } if (myPage.mapped) { semGuard tmpGuard(&mySemaphore); - myPage.mapped[offsetOnPage] = 126; + myPage.mapped[offsetOnPage] = 126 | (countAsViewer?0x80:0); + HIGH_MSG("sharedClient finished ID %d", offsetOnPage/(payLen+1)); } } @@ -1163,16 +1176,19 @@ namespace IPC { DEBUG_MSG(DLVL_WARN, "Trying to keep-alive an element without counters"); return; } - if ((myPage.mapped[offsetOnPage] & 0x7F) < 126) { + if (isAlive()){ myPage.mapped[offsetOnPage] = (countAsViewer ? 0x81 : 0x01); } } bool sharedClient::isAlive() { if (!hasCounter) { - return true; + return (myPage.mapped != 0); } - return (myPage.mapped[offsetOnPage] & 0x7F) < 126; + if (myPage.mapped){ + return (myPage.mapped[offsetOnPage] & 0x7F) < 60; + } + return false; } ///\brief Get a pointer to the data of this client diff --git a/src/output/output.cpp b/src/output/output.cpp index dc9ad494..449623fa 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -66,6 +66,10 @@ namespace Mist { } void Output::updateMeta(){ + //cancel if not alive + if (!nProxy.userClient.isAlive()){ + return; + } //read metadata from page to myMeta variable if (nProxy.metaPages[0].mapped){ IPC::semaphore * liveSem = 0; @@ -913,6 +917,10 @@ namespace Mist { //when live, every keyframe, check correctness of the keyframe number if (myMeta.live && thisPacket.getFlag("keyframe")){ + //cancel if not alive + if (!nProxy.userClient.isAlive()){ + return false; + } //Check whether returned keyframe is correct. If not, wait for approximately 10 seconds while checking. //Failure here will cause tracks to drop due to inconsistent internal state. nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime()); From c74b3e4b8adf405c64cca408c3d37c7f51a374b3 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 7 Sep 2016 08:25:13 +0200 Subject: [PATCH 06/10] Added XSplit bandwidth check output suppression --- src/output/output_rtmp.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index 62ae4ac8..8d1e21a0 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -426,6 +426,16 @@ namespace Mist { void OutRTMP::parseAMFCommand(AMF::Object & amfData, int messageType, int streamId) { MEDIUM_MSG("Received command: %s", amfData.Print().c_str()); HIGH_MSG("AMF0 command: %s", amfData.getContentP(0)->StrValue().c_str()); + if (amfData.getContentP(0)->StrValue() == "xsbwtest") { + //send a _result reply + AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); + amfReply.addContent(AMF::Object("", "_error")); //result success + amfReply.addContent(amfData.getContent(1)); //same transaction ID + amfReply.addContent(AMF::Object("", amfData.getContentP(0)->StrValue())); //null - command info + amfReply.addContent(AMF::Object("", "Hai XSplit user!")); //stream ID? + sendCommand(amfReply, messageType, streamId); + return; + } if (amfData.getContentP(0)->StrValue() == "connect") { double objencoding = 0; if (amfData.getContentP(2)->getContentP("objectEncoding")) { From 53dfcfe1314292396ed74fe08ef39432d917d330 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 15 Sep 2016 15:55:17 +0200 Subject: [PATCH 07/10] Fixed VoD no-load/slow-load 100% CPU usage --- src/output/output.cpp | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/output/output.cpp b/src/output/output.cpp index 449623fa..618c7af4 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -849,25 +849,28 @@ namespace Mist { //check where the next key is nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime()); int nextPage = pageNumForKey(nxt.tid, nxtKeyNum[nxt.tid]+1); - //are we live, and the next key hasn't shown up on another page, then we're waiting. - if (myMeta.live && currKeyOpen.count(nxt.tid) && (currKeyOpen[nxt.tid] == (unsigned int)nextPage || nextPage == -1)){ + //if the next key hasn't shown up on another page, then we're waiting. + //VoD might be slow, so we check VoD case also, just in case + if (currKeyOpen.count(nxt.tid) && (currKeyOpen[nxt.tid] == (unsigned int)nextPage || nextPage == -1)){ if (++emptyCount < 100){ Util::wait(250); //we're waiting for new data to show up if (emptyCount % 8 == 0){ reconnect();//reconnect every 2 seconds }else{ - if (emptyCount % 4 == 0){ + //updating meta is only useful with live streams + if (myMeta.live && emptyCount % 4 == 0){ updateMeta(); } } }else{ //after ~25 seconds, give up and drop the track. - dropTrack(nxt.tid, "could not reload empty packet"); + dropTrack(nxt.tid, "EOP: data wait timeout"); } return false; } + //The next key showed up on another page! //We've simply reached the end of the page. Load the next key = next page. loadPageForKey(nxt.tid, ++nxtKeyNum[nxt.tid]); nxt.offset = 0; From 49cb493b7e4c09f1fba28ab82d07655426c2bdcd Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 14 Sep 2016 19:39:58 +0200 Subject: [PATCH 08/10] Improved output connect to input timeout behaviour --- src/output/output.cpp | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/output/output.cpp b/src/output/output.cpp index 618c7af4..b103cf65 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -167,22 +167,30 @@ namespace Mist { if (statsPage.getData()){ statsPage.finish(); } - statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true); if (nProxy.userClient.getData()){ nProxy.userClient.finish(); } char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); - nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); + unsigned int attempts = 0; + while (!nProxy.userClient.isAlive() && ++attempts < 20 && Util::streamAlive(streamName)){ + nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); + } + if (!nProxy.userClient.isAlive()){ + FAIL_MSG("Could not register as client for %s", streamName.c_str()); + onFail(); + return; + } char pageId[NAME_BUFFER_SIZE]; snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str()); nProxy.metaPages.clear(); nProxy.metaPages[0].init(pageId, DEFAULT_STRM_PAGE_SIZE); if (!nProxy.metaPages[0].mapped){ - FAIL_MSG("Could not connect to server for %s", streamName.c_str()); + FAIL_MSG("Could not connect to data for %s", streamName.c_str()); onFail(); return; } + statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true); stats(true); updateMeta(); selectDefaultTracks(); From 7518014703bfcb6572a4c1b9d094b58523375eb6 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 14 Sep 2016 19:44:38 +0200 Subject: [PATCH 09/10] Added DTSH versioning, made bigMeta the default --- lib/dtsc.h | 49 ++++++++------------ lib/dtscmeta.cpp | 117 +++++++++++++++++++---------------------------- 2 files changed, 65 insertions(+), 101 deletions(-) diff --git a/lib/dtsc.h b/lib/dtsc.h index ae387a7c..02b32f30 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -19,6 +19,12 @@ #define DTSC_ARR 0x0A #define DTSC_CON 0xFF +//Increase this value every time the DTSH file format changes in an incompatible way +//Changelog: +// Version 0-2: Undocumented changes +// Version 3: switched to bigMeta-style by default, Parts layout switched from 3/2/4 to 3/3/3 bytes +#define DTSH_VERSION 3 + namespace DTSC { ///\brief This enum holds all possible datatypes for DTSC packets. @@ -125,6 +131,7 @@ namespace DTSC { int getDataLen() const; int getPayloadLen() const; JSON::Value toJSON() const; + std::string toSummary() const; Scan getScan() const; protected: bool master; @@ -173,21 +180,22 @@ namespace DTSC { ///\brief Basic class for storage of data associated with single DTSC packets, a.k.a. parts. class Part { public: - long getSize(); - void setSize(long newSize); - short getDuration(); - void setDuration(short newDuration); - long getOffset(); - void setOffset(long newOffset); + uint32_t getSize(); + void setSize(uint32_t newSize); + uint32_t getDuration(); + void setDuration(uint32_t newDuration); + uint32_t getOffset(); + void setOffset(uint32_t newOffset); char * getData(); void toPrettyString(std::ostream & str, int indent = 0); private: +#define PACKED_PART_SIZE 9 ///\brief Data storage for this Part. /// /// - 3 bytes: MSB storage of the payload size of this packet in bytes. - /// - 2 bytes: MSB storage of the duration of this packet in milliseconds. - /// - 4 bytes: MSB storage of the presentation time offset of this packet in milliseconds. - char data[9]; + /// - 3 bytes: MSB storage of the duration of this packet in milliseconds. + /// - 3 bytes: MSB storage of the presentation time offset of this packet in milliseconds. + char data[PACKED_PART_SIZE]; }; ///\brief Basic class for storage of data associated with keyframes. @@ -208,7 +216,6 @@ namespace DTSC { char * getData(); void toPrettyString(std::ostream & str, int indent = 0); private: -#ifdef BIGMETA #define PACKED_KEY_SIZE 25 ///\brief Data storage for this Key. /// @@ -217,16 +224,6 @@ namespace DTSC { /// - 4 bytes: MSB storage of the number of this keyframe. /// - 2 bytes: MSB storage of the amount of parts in this keyframe. /// - 8 bytes: MSB storage of the timestamp associated with this keyframe's first packet. -#else -#define PACKED_KEY_SIZE 16 - ///\brief Data storage for this Key. - /// - /// - 5 bytes: MSB storage of the position of the first packet of this keyframe within the file. - /// - 3 bytes: MSB storage of the duration of this keyframe. - /// - 2 bytes: MSB storage of the number of this keyframe. - /// - 2 bytes: MSB storage of the amount of parts in this keyframe. - /// - 4 bytes: MSB storage of the timestamp associated with this keyframe's first packet. -#endif char data[PACKED_KEY_SIZE]; }; @@ -244,7 +241,6 @@ namespace DTSC { char * getData(); void toPrettyString(std::ostream & str, int indent = 0); private: -#ifdef BIGMETA #define PACKED_FRAGMENT_SIZE 13 ///\brief Data storage for this Fragment. /// @@ -252,15 +248,6 @@ namespace DTSC { /// - 1 byte: length (amount of keyframes) /// - 4 bytes: number of first keyframe in fragment /// - 4 bytes: size of fragment in bytes -#else -#define PACKED_FRAGMENT_SIZE 11 - ///\brief Data storage for this Fragment. - /// - /// - 4 bytes: duration (in milliseconds) - /// - 1 byte: length (amount of keyframes) - /// - 2 bytes: number of first keyframe in fragment - /// - 4 bytes: size of fragment in bytes -#endif char data[PACKED_FRAGMENT_SIZE]; }; @@ -335,12 +322,14 @@ namespace DTSC { void writeTo(char * p); JSON::Value toJSON(); void reset(); + bool toFile(const std::string & fileName); void toPrettyString(std::ostream & str, int indent = 0, int verbosity = 0); //members: std::map tracks; bool vod; bool live; bool merged; + uint16_t version; long long int moreheader; long long int bufferWindow; }; diff --git a/lib/dtscmeta.cpp b/lib/dtscmeta.cpp index 3eb338fc..6a4c5dd1 100644 --- a/lib/dtscmeta.cpp +++ b/lib/dtscmeta.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #define AUDIO_KEY_INTERVAL 5000 ///< This define controls the keyframe interval for non-video tracks, such as audio and metadata tracks. @@ -437,6 +438,18 @@ namespace DTSC { return result; } + std::string Packet::toSummary() const { + std::stringstream out; + char * res = 0; + unsigned int len = 0; + getString("data", res, len); + out << getTrackId() << "@" << getTime() << ": " << len << " bytes"; + if (hasMember("keyframe")){ + out << " (keyframe)"; + } + return out.str(); + } + /// Create an invalid DTSC::Scan object by default. Scan::Scan() { p = 0; @@ -840,33 +853,33 @@ namespace DTSC { ///\brief Returns the payloadsize of a part - long Part::getSize() { + uint32_t Part::getSize() { return Bit::btoh24(data); } ///\brief Sets the payloadsize of a part - void Part::setSize(long newSize) { + void Part::setSize(uint32_t newSize) { Bit::htob24(data, newSize); } - ///\brief Retruns the duration of a part - short Part::getDuration() { - return Bit::btohs(data + 3); + ///\brief Returns the duration of a part + uint32_t Part::getDuration() { + return Bit::btoh24(data + 3); } ///\brief Sets the duration of a part - void Part::setDuration(short newDuration) { - Bit::htobs(data + 3, newDuration); + void Part::setDuration(uint32_t newDuration) { + Bit::htob24(data + 3, newDuration); } ///\brief returns the offset of a part - long Part::getOffset() { - return Bit::btohl(data + 5); + uint32_t Part::getOffset() { + return Bit::btoh24(data + 6); } ///\brief Sets the offset of a part - void Part::setOffset(long newOffset) { - Bit::htobl(data + 5, newOffset); + void Part::setOffset(uint32_t newOffset) { + Bit::htob24(data + 6, newOffset); } ///\brief Returns the data of a part @@ -883,93 +896,49 @@ namespace DTSC { ///\brief Returns the byteposition of a keyframe unsigned long long Key::getBpos() { -#ifdef BIGMETA return Bit::btohll(data); -#else - return (((unsigned long long)data[0] << 32) | (data[1] << 24) | (data[2] << 16) | (data[3] << 8) | data[4]); -#endif } void Key::setBpos(unsigned long long newBpos) { -#ifdef BIGMETA Bit::htobll(data, newBpos); -#else - data[4] = newBpos & 0xFF; - data[3] = (newBpos >> 8) & 0xFF; - data[2] = (newBpos >> 16) & 0xFF; - data[1] = (newBpos >> 24) & 0xFF; - data[0] = (newBpos >> 32) & 0xFF; -#endif } unsigned long Key::getLength() { -#ifdef BIGMETA return Bit::btoh24(data+8); -#else - return Bit::btoh24(data+5); -#endif } void Key::setLength(unsigned long newLength) { -#ifdef BIGMETA Bit::htob24(data+8, newLength); -#else - Bit::htob24(data+5, newLength); -#endif } ///\brief Returns the number of a keyframe unsigned long Key::getNumber() { -#ifdef BIGMETA return Bit::btohl(data + 11); -#else - return Bit::btohs(data + 8); -#endif } ///\brief Sets the number of a keyframe void Key::setNumber(unsigned long newNumber) { -#ifdef BIGMETA Bit::htobl(data + 11, newNumber); -#else - Bit::htobs(data + 8, newNumber); -#endif } ///\brief Returns the number of parts of a keyframe unsigned short Key::getParts() { -#ifdef BIGMETA return Bit::btohs(data + 15); -#else - return Bit::btohs(data + 10); -#endif } ///\brief Sets the number of parts of a keyframe void Key::setParts(unsigned short newParts) { -#ifdef BIGMETA Bit::htobs(data + 15, newParts); -#else - Bit::htobs(data + 10, newParts); -#endif } ///\brief Returns the timestamp of a keyframe unsigned long long Key::getTime() { -#ifdef BIGMETA return Bit::btohll(data + 17); -#else - return Bit::btohl(data + 12); -#endif } ///\brief Sets the timestamp of a keyframe void Key::setTime(unsigned long long newTime) { -#ifdef BIGMETA Bit::htobll(data + 17, newTime); -#else - Bit::htobl(data + 12, newTime); -#endif } ///\brief Returns the data of this keyframe struct @@ -1006,38 +975,22 @@ namespace DTSC { ///\brief Returns the number of the first keyframe in this fragment unsigned long Fragment::getNumber() { -#ifdef BIGMETA return Bit::btohl(data + 5); -#else - return Bit::btohs(data + 5); -#endif } ///\brief Sets the number of the first keyframe in this fragment void Fragment::setNumber(unsigned long newNumber) { -#ifdef BIGMETA Bit::htobl(data + 5, newNumber); -#else - Bit::htobs(data + 5, newNumber); -#endif } ///\brief Returns the size of a fragment unsigned long Fragment::getSize() { -#ifdef BIGMETA return Bit::btohl(data + 9); -#else - return Bit::btohl(data + 7); -#endif } ///\brief Sets the size of a fragement void Fragment::setSize(unsigned long newSize) { -#ifdef BIGMETA Bit::htobl(data + 9, newSize); -#else - Bit::htobl(data + 7, newSize); -#endif } ///\brief Returns thte data of this fragment structure @@ -1294,6 +1247,7 @@ namespace DTSC { Meta::Meta() { vod = false; live = false; + version = DTSH_VERSION; moreheader = 0; merged = false; bufferWindow = 0; @@ -1307,6 +1261,7 @@ namespace DTSC { tracks.clear(); vod = source.getFlag("vod"); live = source.getFlag("live"); + version = source.getInt("version"); merged = source.getFlag("merged"); bufferWindow = source.getInt("buffer_window"); moreheader = source.getInt("moreheader"); @@ -1329,6 +1284,7 @@ namespace DTSC { Meta::Meta(JSON::Value & meta) { vod = meta.isMember("vod") && meta["vod"]; live = meta.isMember("live") && meta["live"]; + version = meta.isMember("version") ? meta["version"].asInt() : 0; merged = meta.isMember("merged") && meta["merged"]; bufferWindow = 0; if (meta.isMember("buffer_window")) { @@ -1713,6 +1669,7 @@ namespace DTSC { dataLen += it->second.getSendLen(skipDynamic); } } + if (version){dataLen += 17;} return dataLen + 8; //add 8 bytes header } @@ -1738,6 +1695,10 @@ namespace DTSC { writePointer(p, "\000\006merged\001", 9); writePointer(p, convertLongLong(1), 8); } + if (version) { + writePointer(p, "\000\006version\001", 9); + writePointer(p, convertLongLong(version), 8); + } if (bufferWindow) { writePointer(p, "\000\015buffer_window\001", 16); writePointer(p, convertLongLong(bufferWindow), 8); @@ -1771,6 +1732,10 @@ namespace DTSC { conn.SendNow("\000\006merged\001", 9); conn.SendNow(convertLongLong(1), 8); } + if (version) { + conn.SendNow("\000\006version\001", 9); + conn.SendNow(convertLongLong(version), 8); + } if (bufferWindow) { conn.SendNow("\000\015buffer_window\001", 16); conn.SendNow(convertLongLong(bufferWindow), 8); @@ -1855,10 +1820,20 @@ namespace DTSC { if (bufferWindow) { result["buffer_window"] = bufferWindow; } + if (version) { + result["version"] = (long long)version; + } result["moreheader"] = moreheader; return result; } + ///\brief Writes metadata to a filename. Wipes existing contents, if any. + bool Meta::toFile(const std::string & fileName){ + std::ofstream oFile(fileName.c_str()); + oFile << toJSON().toNetPacked(); + oFile.close(); + } + ///\brief Converts a meta object to a human readable string ///\param str The stringstream to append to ///\param indent the amount of indentation needed From fedd18146ee7d89fec763bd2cf9e5fb794a0e6e4 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 15 Sep 2016 15:56:17 +0200 Subject: [PATCH 10/10] Generalized DTSH header reading and writing of Inputs --- src/input/input.cpp | 14 ++++++++++++++ src/input/input.h | 1 + src/input/input_flv.cpp | 20 ++++---------------- src/input/input_mp3.cpp | 16 +++------------- src/input/input_ogg.cpp | 6 +----- 5 files changed, 23 insertions(+), 34 deletions(-) diff --git a/src/input/input.cpp b/src/input/input.cpp index c7230888..e31e8fc7 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -533,5 +533,19 @@ namespace Mist { void Input::quitPlay(){ playing = 0; } + + bool Input::readExistingHeader(){ + DTSC::File tmpdtsh(config->getString("input") + ".dtsh"); + if (!tmpdtsh){ + return false; + } + if (tmpdtsh.getMeta().version != DTSH_VERSION){ + INFO_MSG("Updating wrong version header file from version %llu to %llu", tmpdtsh.getMeta().version, DTSH_VERSION); + return false; + } + myMeta = tmpdtsh.getMeta(); + return true; + } + } diff --git a/src/input/input.h b/src/input/input.h index 25589429..996f4261 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -29,6 +29,7 @@ namespace Mist { static void callbackWrapper(char * data, size_t len, unsigned int id); virtual bool setup() = 0; virtual bool readHeader() = 0; + virtual bool readExistingHeader(); virtual bool atKeyFrame(); virtual void getNext(bool smart = true) {}; virtual void seek(int seekTime){}; diff --git a/src/input/input_flv.cpp b/src/input/input_flv.cpp index 66aa218b..53412c9c 100644 --- a/src/input/input_flv.cpp +++ b/src/input/input_flv.cpp @@ -49,23 +49,13 @@ namespace Mist { } bool inputFLV::readHeader() { - JSON::Value lastPack; - if (!inFile) { - return false; - } + if (!inFile){return false;} //See whether a separate header file exists. - DTSC::File tmp(config->getString("input") + ".dtsh"); - if (tmp){ - myMeta = tmp.getMeta(); - if (myMeta){ - return true; - }else{ - myMeta = DTSC::Meta(); - } - } + if (readExistingHeader()){return true;} //Create header file from FLV data fseek(inFile, 13, SEEK_SET); AMF::Object amf_storage; + JSON::Value lastPack; long long int lastBytePos = 13; while (!feof(inFile) && !FLV::Parse_Error){ if (tmpTag.FileLoader(inFile)){ @@ -80,9 +70,7 @@ namespace Mist { std::cerr << FLV::Error_Str << std::endl; return false; } - std::ofstream oFile(std::string(config->getString("input") + ".dtsh").c_str()); - oFile << myMeta.toJSON().toNetPacked(); - oFile.close(); + myMeta.toFile(config->getString("input") + ".dtsh"); return true; } diff --git a/src/input/input_mp3.cpp b/src/input/input_mp3.cpp index 7cf0b21d..3307923c 100644 --- a/src/input/input_mp3.cpp +++ b/src/input/input_mp3.cpp @@ -47,17 +47,9 @@ namespace Mist { } bool inputMP3::readHeader() { - if (!inFile) { - return false; - } + if (!inFile){return false;} //See whether a separate header file exists. - DTSC::File tmp(config->getString("input") + ".dtsh"); - if (tmp){ - myMeta = tmp.getMeta(); - if (myMeta){ - return true; - } - } + if (readExistingHeader()){return true;} myMeta = DTSC::Meta(); myMeta.tracks[1].trackID = 1; myMeta.tracks[1].type = "audio"; @@ -93,9 +85,7 @@ namespace Mist { fseek(inFile, 0, SEEK_SET); timestamp = 0; - std::ofstream oFile(std::string(config->getString("input") + ".dtsh").c_str()); - oFile << myMeta.toJSON().toNetPacked(); - oFile.close(); + myMeta.toFile(config->getString("input") + ".dtsh"); return true; } diff --git a/src/input/input_ogg.cpp b/src/input/input_ogg.cpp index 06e20f6a..c5a95237 100644 --- a/src/input/input_ogg.cpp +++ b/src/input/input_ogg.cpp @@ -222,11 +222,7 @@ namespace Mist { getNext(); } - std::ofstream oFile(std::string(config->getString("input") + ".dtsh").c_str()); - oFile << myMeta.toJSON().toNetPacked(); - oFile.close(); - - //myMeta.toPrettyString(std::cout); + myMeta.toFile(config->getString("input") + ".dtsh"); return true; }