diff --git a/lib/procs.cpp b/lib/procs.cpp index 3fb7212c..f872f109 100644 --- a/lib/procs.cpp +++ b/lib/procs.cpp @@ -270,7 +270,7 @@ pid_t Util::Procs::StartPiped(std::deque & argDeq, int * fdin, int /// \arg fdin Standard input file descriptor. If null, /dev/null is assumed. Otherwise, if arg contains -1, a new fd is automatically allocated and written into this arg. Then the arg will be used as fd. /// \arg fdout Same as fdin, but for stdout. /// \arg fdout Same as fdin, but for stderr. -pid_t Util::Procs::StartPiped(char * const * argv, int * fdin, int * fdout, int * fderr) { +pid_t Util::Procs::StartPiped(const char * const * argv, int * fdin, int * fdout, int * fderr) { pid_t pid; int pipein[2], pipeout[2], pipeerr[2]; //DEBUG_MSG(DLVL_DEVEL, "setHandler"); @@ -364,7 +364,8 @@ pid_t Util::Procs::StartPiped(char * const * argv, int * fdin, int * fdout, int if (devnull != -1) { close(devnull); } - execvp(argv[0], argv); + //Because execvp requires a char* const* and we have a const char* const* + execvp(argv[0], (char* const*)argv); DEBUG_MSG(DLVL_ERROR, "execvp failed for process %s, reason: %s", argv[0], strerror(errno)); exit(42); } else if (pid == -1) { diff --git a/lib/procs.h b/lib/procs.h index 667f570b..25d78e5a 100644 --- a/lib/procs.h +++ b/lib/procs.h @@ -30,7 +30,7 @@ namespace Util { static void setHandler(); static std::string getOutputOf(char * const * argv); static std::string getOutputOf(std::deque & argDeq); - static pid_t StartPiped(char * const * argv, int * fdin, int * fdout, int * fderr); + static pid_t StartPiped(const char * const * argv, int * fdin, int * fdout, int * fderr); static pid_t StartPiped(std::deque & argDeq, int * fdin, int * fdout, int * fderr); static void Stop(pid_t name); static void Murder(pid_t name); diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index ba4e656b..d3fdf8c3 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -873,7 +873,6 @@ namespace IPC { empty = (char *)malloc(payLen * sizeof(char)); memset(empty, 0, payLen); } - semGuard tmpGuard(&mySemaphore); unsigned int id = 0; for (std::deque::iterator it = myPages.begin(); it != myPages.end(); it++) { if (!it->mapped || !it->len) { @@ -911,7 +910,6 @@ namespace IPC { empty = (char *)malloc(payLen * sizeof(char)); memset(empty, 0, payLen); } - semGuard tmpGuard(&mySemaphore); unsigned int id = 0; unsigned int userCount = 0; unsigned int emptyCount = 0; @@ -940,7 +938,7 @@ namespace IPC { VERYHIGH_MSG("Shared memory %s is now at count %u", baseName.c_str(), amount); } uint32_t tmpPID = *((uint32_t *)(it->mapped + 1 + offset + payLen - 4)); - if (!Util::Procs::isRunning(tmpPID) && !(countNum == 126 || countNum == 127)){ + if (tmpPID > 1 && !Util::Procs::isRunning(tmpPID) && !(countNum == 126 || countNum == 127)){ WARN_MSG("process disappeared, timing out. (pid %lu)", tmpPID); *counter = 125 | (0x80 & (*counter)); //if process is already dead, instant timeout. } @@ -954,7 +952,7 @@ namespace IPC { break; default: #ifndef NOCRASHCHECK - if (tmpPID) { + if (tmpPID > 1) { if (countNum > 10 && countNum < 60) { if (countNum < 30) { if (countNum > 15) { @@ -982,6 +980,7 @@ namespace IPC { break; } if (countNum == 127 || countNum == 126){ + semGuard tmpGuard(&mySemaphore); if (disconCallback){ disconCallback(it->mapped + offset + 1, payLen, id); } @@ -1039,8 +1038,10 @@ namespace IPC { } if (emptyCount > 1) { + semGuard tmpGuard(&mySemaphore); deletePage(); } else if (!emptyCount) { + semGuard tmpGuard(&mySemaphore); newPage(); } @@ -1075,7 +1076,6 @@ namespace IPC { DEBUG_MSG(DLVL_FAIL, "Creating semaphore failed: %s", strerror(errno)); return; } - semGuard tmpGuard(&mySemaphore); myPage.init(rhs.myPage.name, rhs.myPage.len, rhs.myPage.master); offsetOnPage = rhs.offsetOnPage; } @@ -1096,7 +1096,6 @@ namespace IPC { DEBUG_MSG(DLVL_FAIL, "Creating copy of semaphore %s failed: %s", baseName.c_str(), strerror(errno)); return; } - semGuard tmpGuard(&mySemaphore); myPage.init(rhs.myPage.name, rhs.myPage.len, rhs.myPage.master); offsetOnPage = rhs.offsetOnPage; } @@ -1129,7 +1128,6 @@ namespace IPC { } while (offsetOnPage == -1) { { - semGuard tmpGuard(&mySemaphore); for (char i = 'A'; i <= 'Z'; i++) { myPage.init(baseName.substr(1) + i, (4096 << (i - 'A')), false, false); if (!myPage.mapped) { @@ -1138,13 +1136,16 @@ namespace IPC { int offset = 0; while (offset + payLen + (hasCounter ? 1 : 0) <= myPage.len) { if ((hasCounter && myPage.mapped[offset] == 0) || (!hasCounter && !memcmp(myPage.mapped + offset, empty, payLen))) { - offsetOnPage = offset; - if (hasCounter) { - myPage.mapped[offset] = 1; - *((uint32_t *)(myPage.mapped + 1 + offset + len - 4)) = getpid(); - HIGH_MSG("sharedClient received ID %d", offsetOnPage/(payLen+1)); + semGuard tmpGuard(&mySemaphore); + if ((hasCounter && myPage.mapped[offset] == 0) || (!hasCounter && !memcmp(myPage.mapped + offset, empty, payLen))) { + offsetOnPage = offset; + if (hasCounter) { + myPage.mapped[offset] = 1; + *((uint32_t *)(myPage.mapped + 1 + offset + len - 4)) = getpid(); + HIGH_MSG("sharedClient received ID %d", offsetOnPage/(payLen+1)); + } + break; } - break; } offset += payLen + (hasCounter ? 1 : 0); } diff --git a/lib/socket.cpp b/lib/socket.cpp index 1ff5b049..4fda3f80 100644 --- a/lib/socket.cpp +++ b/lib/socket.cpp @@ -400,7 +400,7 @@ Socket::Connection::Connection(std::string host, int port, bool nonblock){ /// and when the socket is closed manually. /// \returns True if socket is connected, false otherwise. bool Socket::Connection::connected() const{ - return (sock >= 0) || ((pipes[0] >= 0) && (pipes[1] >= 0)); + return (sock >= 0) || ((pipes[0] >= 0) || (pipes[1] >= 0)); } /// Returns the time this socket has been connected. diff --git a/src/output/output_httpts.cpp b/src/output/output_httpts.cpp index a18ca68d..c5bbbe53 100644 --- a/src/output/output_httpts.cpp +++ b/src/output/output_httpts.cpp @@ -77,7 +77,7 @@ namespace Mist { return; } H.protocol = "HTTP/1.0";//Force HTTP/1.0 because some devices just don't understand chunked replies - H.StartResponse(H, myConn); + H.StartResponse(H, myConn); parseData = true; wantRequest = false; } diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index f5179fe5..a69e59af 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -1144,7 +1144,7 @@ namespace Mist { case 9: //video data case 18: {//meta data static std::map pushMeta; - static uint64_t lastTagTime = 0; + static std::map lastTagTime; if (!isInitialized) { MEDIUM_MSG("Received useless media data"); onFinish(); @@ -1163,22 +1163,23 @@ namespace Mist { F.toMeta(myMeta, *amf_storage, reTrack); if (F.getDataLen() && !(F.needsInitData() && F.isInitData())){ uint64_t tagTime = next.timestamp; + uint64_t & ltt = lastTagTime[reTrack]; //Check for decreasing timestamps - this is a connection error. //We allow wrapping around the 32 bits maximum value if the most significant 8 bits are set. /// \TODO Provide time continuity for wrap-around. - if (lastTagTime && tagTime < lastTagTime && lastTagTime < 0xFF000000ull){ - FAIL_MSG("Timestamps went from %llu to %llu (decreased): disconnecting!", lastTagTime, tagTime); + if (ltt && tagTime < ltt && ltt < 0xFF000000ull){ + FAIL_MSG("Timestamps went from %llu to %llu (decreased): disconnecting!", ltt, tagTime); onFinish(); break; } //Check if we went more than 10 minutes into the future - if (lastTagTime && tagTime > lastTagTime + 600000){ - FAIL_MSG("Timestamps went from %llu to %llu (> 10m in future): disconnecting!", lastTagTime, tagTime); + if (ltt && tagTime > ltt + 600000){ + FAIL_MSG("Timestamps went from %llu to %llu (> 10m in future): disconnecting!", ltt, tagTime); onFinish(); break; } thisPacket.genericFill(tagTime, F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe); - lastTagTime = tagTime; + ltt = tagTime; if (!nProxy.userClient.getData()){ char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());