From 24006648f9d3dd28949139489bd4e172f37e827b Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 1 Nov 2018 16:11:47 +0100 Subject: [PATCH 1/2] Cleaned up, fixed and robustified semaphore and signal related code --- lib/config.cpp | 6 +- lib/shared_memory.cpp | 86 +++++++++++------ lib/shared_memory.h | 2 + src/controller/controller.cpp | 3 +- src/input/input.cpp | 171 +++++++++++++++++----------------- src/input/input.h | 2 +- src/input/input_buffer.cpp | 26 +++--- 7 files changed, 165 insertions(+), 131 deletions(-) diff --git a/lib/config.cpp b/lib/config.cpp index dce06ca4..9a389871 100644 --- a/lib/config.cpp +++ b/lib/config.cpp @@ -401,7 +401,7 @@ void Util::Config::activate() { struct sigaction cur_action; new_action.sa_sigaction = signal_handler; sigemptyset(&new_action.sa_mask); - new_action.sa_flags = 0; + new_action.sa_flags = SA_SIGINFO; sigaction(SIGINT, &new_action, NULL); sigaction(SIGHUP, &new_action, NULL); sigaction(SIGTERM, &new_action, NULL); @@ -423,6 +423,10 @@ void Util::Config::signal_handler(int signum, siginfo_t * sigInfo, void * ignore case SIGHUP: case SIGTERM: if (serv_sock_pointer){serv_sock_pointer->close();} +#if DEBUG >= DLVL_DEVEL + static int ctr = 0; + if (!is_active && ++ctr > 4){BACKTRACE;} +#endif is_active = false; default: switch (sigInfo->si_code){ diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index fc2f9548..dd7f169f 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -93,6 +93,7 @@ namespace IPC { #else mySem = SEM_FAILED; #endif + isLocked = false; } ///\brief Constructs a named semaphore @@ -106,6 +107,7 @@ namespace IPC { #else mySem = SEM_FAILED; #endif + isLocked = false; open(name, oflag, mode, value, noWait); } @@ -178,9 +180,9 @@ namespace IPC { } #endif } - if (!(*this)) { + if (*this) { + myName = (char *)name; } - myName = (char *)name; } ///\brief Returns the current value of the semaphore @@ -197,12 +199,20 @@ namespace IPC { ///\brief Posts to the semaphore, increases its value by one void semaphore::post() { + if (!*this || !isLocked){ + FAIL_MSG("Attempted to unlock a non-locked semaphore: '%s'!", myName.c_str()); +#if DEBUG >= DLVL_DEVEL + BACKTRACE; +#endif + return; + } if (*this) { #if defined(__CYGWIN__) || defined(_WIN32) ReleaseMutex(mySem); #else sem_post(mySem); #endif + isLocked = false; } } @@ -217,6 +227,7 @@ namespace IPC { tmp = sem_wait(mySem); } while (tmp == -1 && errno == EINTR); #endif + isLocked = true; } } @@ -235,7 +246,7 @@ namespace IPC { result = sem_trywait(mySem); } while (result == -1 && errno == EINTR); #endif - return (result == 0); + return isLocked = (result == 0); } ///\brief Tries to wait for the semaphore for a single second, returns true if successful, false otherwise @@ -254,6 +265,7 @@ namespace IPC { long long unsigned int timeout = now + 1e6; while (now < timeout) { if (0 == sem_trywait(mySem)) { + isLocked = true; return true; } usleep(100e3); @@ -266,12 +278,28 @@ namespace IPC { wt.tv_nsec = 0; result = sem_timedwait(mySem, &wt); #endif - return (result == 0); + return isLocked = (result == 0); } ///\brief Closes the currently opened semaphore void semaphore::close() { if (*this) { + if (isLocked){post();} +#if defined(__CYGWIN__) || defined(_WIN32) + CloseHandle(mySem); + mySem = 0; +#else + sem_close(mySem); + mySem = SEM_FAILED; +#endif + } + myName.clear(); + } + + /// Closes the semaphore, without unlocking it first. + /// Intended to be called from forked child processes, to drop the reference to the semaphore. + void semaphore::abandon() { + if (*this) { #if defined(__CYGWIN__) || defined(_WIN32) CloseHandle(mySem); mySem = 0; @@ -280,19 +308,28 @@ namespace IPC { mySem = SEM_FAILED; #endif } - } - - ///\brief Unlinks the previously opened semaphore - void semaphore::unlink() { - close(); -#if !defined(__CYGWIN__) && !defined(_WIN32) - if (myName.size()) { - sem_unlink(myName.c_str()); - } -#endif myName.clear(); } + /// Unlinks the previously opened semaphore, closing it (if open) in the process. + void semaphore::unlink() { +#if defined(__CYGWIN__) || defined(_WIN32) + if (isLocked){post();} +#endif +#if !defined(__CYGWIN__) && !defined(_WIN32) + if (myName.size()){sem_unlink(myName.c_str());} +#endif + if (*this) { +#if defined(__CYGWIN__) || defined(_WIN32) + CloseHandle(mySem); + mySem = 0; +#else + sem_close(mySem); + mySem = SEM_FAILED; +#endif + } + myName.clear(); + } #if defined(__CYGWIN__) || defined(_WIN32) SECURITY_ATTRIBUTES semaphore::getSecurityAttributes() { @@ -368,7 +405,7 @@ namespace IPC { ///\brief Unmaps a shared page if allowed void sharedPage::unmap() { - if (mapped && len) { + if (mapped) { #if defined(__CYGWIN__) || defined(_WIN32) //under Cygwin, the mapped location is shifted by 4 to contain the page size. UnmapViewOfFile(mapped - 4); @@ -795,26 +832,19 @@ namespace IPC { baseName = "/" + name; payLen = len; hasCounter = withCounter; - mySemaphore.open(baseName.c_str(), O_CREAT | O_EXCL | O_RDWR, ACCESSPERMS, 1); - if (!mySemaphore) { - mySemaphore.open(baseName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); - } + mySemaphore.open(baseName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); if (!mySemaphore) { DEBUG_MSG(DLVL_FAIL, "Creating semaphore failed: %s", strerror(errno)); return; + }else{ + semGuard tmpGuard(&mySemaphore); + amount = 0; + newPage(); } - if (!mySemaphore.tryWaitOneSecond()){ - WARN_MSG("Force unlocking sharedServer semaphore to prevent deadlock"); - } - mySemaphore.post(); - semGuard tmpGuard(&mySemaphore); - amount = 0; - newPage(); } ///\brief The deconstructor sharedServer::~sharedServer() { - mySemaphore.close(); mySemaphore.unlink(); } @@ -1251,7 +1281,7 @@ namespace IPC { if (!hasCounter) { return (myPage.mapped != 0); } - if (myPage.mapped){ + if (myPage.mapped && offsetOnPage >= 0){ return (myPage.mapped[offsetOnPage] & 0x7F) < 60; } return false; diff --git a/lib/shared_memory.h b/lib/shared_memory.h index 0b7f23f2..fe146910 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -75,6 +75,7 @@ namespace IPC { bool tryWait(); bool tryWaitOneSecond(); void close(); + void abandon(); void unlink(); private: #if defined(__CYGWIN__) || defined(_WIN32) @@ -84,6 +85,7 @@ namespace IPC { #else sem_t * mySem; #endif + bool isLocked; std::string myName; }; diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index b63c2af9..7ce1e016 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -83,7 +83,8 @@ void statusMonitor(void *np){ WARN_MSG("Configuration semaphore was stuck. Force-unlocking it and re-writing config."); changed = true; } - configLock.post(); + configLock.unlink(); + configLock.open(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); if (changed || Controller::configChanged){ Controller::writeConfig(); Controller::configChanged = false; diff --git a/src/input/input.cpp b/src/input/input.cpp index 5d50cb7f..d322ac9c 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -109,41 +109,77 @@ namespace Mist { } IPC::semaphore playerLock; - if (needsLock() && streamName.size()){ - char semName[NAME_BUFFER_SIZE]; - snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamName.c_str()); - playerLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (!playerLock.tryWait()){ - DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", streamName.c_str()); - return 1; + IPC::semaphore pullLock; + + //If we're not converting, we might need a lock. + if (streamName.size()){ + if (needsLock()){ + //needsLock() == true means this input is the sole responsible input for a stream + //That means it's MistInBuffer for live, or the actual input binary for VoD + //For these cases, we lock the SEM_INPUT semaphore. + char semName[NAME_BUFFER_SIZE]; + snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamName.c_str()); + playerLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!playerLock.tryWait()){ + INFO_MSG("A player for stream %s is already running", streamName.c_str()); + playerLock.close(); + return 1; + } + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); + streamStatus.init(pageName, 1, true, false); + if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INIT;} + streamStatus.master = false; + streamStatus.close(); + }else{ + //needsLock() == false means this binary will itself start the sole responsible input + //So, we definitely do NOT lock SEM_INPUT, since the child process will do that later. + //However, most of these processes are singular, meaning they expect to be the only source of data. + //To prevent multiple singular processes starting, we use the MstPull semaphore if this input + //is indeed a singular input type. + if (isSingular()){ + pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!pullLock){ + FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str()); + return 1; + } + if (!pullLock.tryWait()){ + WARN_MSG("A pull process for stream %s is already running", streamName.c_str()); + pullLock.close(); + return 1; + } + } } - char pageName[NAME_BUFFER_SIZE]; - snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); - streamStatus.init(pageName, 1, true, false); - if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INIT;} - streamStatus.master = false; - streamStatus.close(); } + config->activate(); uint64_t reTimer = 0; while (config->is_active){ pid_t pid = fork(); if (pid == 0){ - //Re-init streamStatus, previously closed - char pageName[NAME_BUFFER_SIZE]; - snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); - streamStatus.init(pageName, 1, true, false); - streamStatus.master = false; - if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INIT;} - if (needsLock()){playerLock.close();} + if (playerLock){ + //Re-init streamStatus, previously closed + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); + streamStatus.init(pageName, 1, true, false); + streamStatus.master = false; + if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INIT;} + } + //Abandon all semaphores, ye who enter here. + playerLock.abandon(); + pullLock.abandon(); if (!preRun()){return 0;} return run(); } if (pid == -1){ FAIL_MSG("Unable to spawn input process"); - if (needsLock()){playerLock.post();} + //We failed. Release the kra... semaphores! + //post() contains an is-open check already, no need to double-check. + playerLock.unlink(); + pullLock.unlink(); return 2; } + HIGH_MSG("Waiting for child for stream %s", streamName.c_str()); //wait for the process to exit int status; while (waitpid(pid, &status, 0) != pid && errno == EINTR){ @@ -153,35 +189,38 @@ namespace Mist { } continue; } + HIGH_MSG("Done waiting for child for stream %s", streamName.c_str()); //if the exit was clean, don't restart it if (WIFEXITED(status) && (WEXITSTATUS(status) == 0)){ INFO_MSG("Input for stream %s shut down cleanly", streamName.c_str()); break; } - char pageName[NAME_BUFFER_SIZE]; - snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); - streamStatus.init(pageName, 1, true, false); - if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INVALID;} + if (playerLock){ + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); + streamStatus.init(pageName, 1, true, false); + if (streamStatus){streamStatus.mapped[0] = STRMSTAT_INVALID;} + } #if DEBUG >= DLVL_DEVEL - WARN_MSG("Aborting autoclean; this is a development build."); - INFO_MSG("Input for stream %s uncleanly shut down! Aborting restart; this is a development build.", streamName.c_str()); + WARN_MSG("Input for stream %s uncleanly shut down! Aborting restart; this is a development build.", streamName.c_str()); break; #else + WARN_MSG("Input for stream %s uncleanly shut down! Restarting...", streamName.c_str()); onCrash(); - INFO_MSG("Input for stream %s uncleanly shut down! Restarting...", streamName.c_str()); Util::wait(reTimer); reTimer += 1000; #endif } - if (needsLock()){ - playerLock.post(); + + if (playerLock){ playerLock.unlink(); - playerLock.close(); + char pageName[NAME_BUFFER_SIZE]; + snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); + streamStatus.init(pageName, 1, true, false); + streamStatus.close(); } - char pageName[NAME_BUFFER_SIZE]; - snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); - streamStatus.init(pageName, 1, true, false); - streamStatus.close(); + pullLock.unlink(); + HIGH_MSG("Angel process for %s exiting", streamName.c_str()); return 0; } @@ -207,29 +246,21 @@ namespace Mist { } if (!streamName.size()) { + //If we don't have a stream name, that means we're in stand-alone conversion mode. MEDIUM_MSG("Starting convert"); convert(); } else if (!needsLock()) { + //We have a name and aren't the sole process. That means we're streaming live data to a buffer. MEDIUM_MSG("Starting stream"); stream(); }else{ + //We are the sole process and have a name. That means this is a Buffer or VoD input. MEDIUM_MSG("Starting serve"); serve(); } return 0; } - /// Default crash handler, cleans up Pull semaphore on crashes - void Input::onCrash(){ - if (streamName.size() && !needsLock()) { - //we have a Pull semaphore to clean up, do it - IPC::semaphore pullLock; - pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); - pullLock.close(); - pullLock.unlink(); - } - } - void Input::convert() { //check filename for no - if (config->getString("output") != "-"){ @@ -296,7 +327,7 @@ namespace Mist { userPage.init(userPageName, PLAY_EX_SIZE, true); if (streamStatus){streamStatus.mapped[0] = STRMSTAT_READY;} - DEBUG_MSG(DLVL_DEVEL, "Input for stream %s started", streamName.c_str()); + INFO_MSG("Input for stream %s started", streamName.c_str()); activityCounter = Util::bootSecs(); //main serve loop while (keepRunning()) { @@ -321,7 +352,7 @@ namespace Mist { if (streamStatus){streamStatus.mapped[0] = STRMSTAT_SHUTDOWN;} config->is_active = false; finish(); - DEBUG_MSG(DLVL_DEVEL, "Input for stream %s closing clean", streamName.c_str()); + INFO_MSG("Input for stream %s closing clean", streamName.c_str()); userPage.finishEach(); //end player functionality } @@ -352,27 +383,10 @@ namespace Mist { /// - if there are tracks, register as a non-viewer on the user page of the buffer /// - call getNext() in a loop, buffering packets void Input::stream(){ - IPC::semaphore pullLock; - if(isSingular()){ - pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (!pullLock){ - FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str()); - return; - } - - if (!pullLock.tryWait()){ - WARN_MSG("A pull process for stream %s is already running", streamName.c_str()); - pullLock.close(); - return; - } - - if (Util::streamAlive(streamName)){ - pullLock.post(); - pullLock.close(); - pullLock.unlink(); - WARN_MSG("Stream already online, cancelling"); - return; - } + + if (Util::streamAlive(streamName)){ + WARN_MSG("Stream already online, cancelling"); + return; } std::map overrides; @@ -382,11 +396,6 @@ namespace Mist { } if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true, overrides)) {//manually override stream url to start the buffer - if(isSingular()){ - pullLock.post(); - pullLock.close(); - pullLock.unlink(); - } WARN_MSG("Could not start buffer, cancelling"); return; } @@ -396,10 +405,6 @@ namespace Mist { if (!openStreamSource()){ FAIL_MSG("Unable to connect to source"); - if(isSingular()){ - pullLock.post(); - pullLock.close(); - } return; } @@ -413,11 +418,6 @@ namespace Mist { if (myMeta.tracks.size() == 0){ nProxy.userClient.finish(); finish(); - if(isSingular()){ - pullLock.post(); - pullLock.close(); - pullLock.unlink(); - } INFO_MSG("No tracks found, cancelling"); return; } @@ -434,11 +434,6 @@ namespace Mist { nProxy.userClient.finish(); finish(); - if(isSingular()){ - pullLock.post(); - pullLock.close(); - pullLock.unlink(); - } INFO_MSG("Stream input %s closing clean; reason: %s", streamName.c_str(), reason.c_str()); return; } diff --git a/src/input/input.h b/src/input/input.h index 30727947..3bfe22af 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -20,7 +20,7 @@ namespace Mist { public: Input(Util::Config * cfg); virtual int run(); - virtual void onCrash(); + virtual void onCrash(){} virtual int boot(int argc, char * argv[]); virtual ~Input() {}; diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 027da501..e1c6147e 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -463,7 +463,7 @@ namespace Mist { } //Track is set to "New track request", assign new track id and create shared memory page //This indicates that the 'current key' part of the element is set to contain the original track id from the pushing process - if (value & 0x80000000) { + if (config->is_active && (value & 0x80000000)) { if (value & 0x40000000) { unsigned long finalMap = value & ~0xC0000000; //Register the new track as an active track. @@ -480,9 +480,15 @@ namespace Mist { char tempMetaName[NAME_BUFFER_SIZE]; snprintf(tempMetaName, NAME_BUFFER_SIZE, SHM_TRACK_META, config->getString("streamname").c_str(), finalMap); - tMeta.init(tempMetaName, 8388608, false); + tMeta.init(tempMetaName, 8388608, false, false); + if (!tMeta){continue;}//abort for now if page doesn't exist yet - //The page exist, now we try to read in the metadata of the track + char firstPage[NAME_BUFFER_SIZE]; + snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), finalMap); + nProxy.metaPages[finalMap].init(firstPage, SHM_TRACK_INDEX_SIZE, false, false); + if (!nProxy.metaPages[finalMap]){continue;}//abort for now if page doesn't exist yet + + //The pages exist, now we try to read in the metadata of the track //Store the size of the dtsc packet to read. unsigned int len = ntohl(((int *)tMeta.mapped)[1]); @@ -505,11 +511,6 @@ namespace Mist { userConn.setTrackId(index, finalMap); userConn.setKeynum(index, 0x0000); - - char firstPage[NAME_BUFFER_SIZE]; - snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), finalMap); - nProxy.metaPages[finalMap].init(firstPage, SHM_TRACK_INDEX_SIZE, false); - //Update the metadata for this track updateTrackMeta(finalMap); hasPush = true; @@ -536,7 +537,7 @@ namespace Mist { } //The track id is set to the value of a track that we are currently negotiating about - if (negotiatingTracks.count(value)) { + if (config->is_active && negotiatingTracks.count(value)) { //If the metadata page for this track is not yet registered, initialize it if (!nProxy.metaPages.count(value) || !nProxy.metaPages[value].mapped) { char tempMetaName[NAME_BUFFER_SIZE]; @@ -740,7 +741,10 @@ namespace Mist { strName = strName.substr(0, (strName.find_first_of("+ "))); IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); + if (!configLock.tryWaitOneSecond()){ + INFO_MSG("Aborting stream config refresh: locking took longer than expected"); + return false; + } DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(strName); long long tmpNum; @@ -783,8 +787,6 @@ namespace Mist { resumeMode = tmpNum; } - configLock.post(); - configLock.close(); return true; } From 425e98c6fd46ae977ac37f840c56cd05c9c27288 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 1 Nov 2018 16:25:10 +0100 Subject: [PATCH 2/2] Fixed TS-output (=HLS) related SIGABRT problem --- src/output/output_ts_base.cpp | 163 ++++++++++++++++++---------------- src/output/output_ts_base.h | 2 - 2 files changed, 86 insertions(+), 79 deletions(-) diff --git a/src/output/output_ts_base.cpp b/src/output/output_ts_base.cpp index ff0c47bd..6df6cff9 100644 --- a/src/output/output_ts_base.cpp +++ b/src/output/output_ts_base.cpp @@ -1,9 +1,9 @@ #include "output_ts_base.h" +#include namespace Mist { TSOutput::TSOutput(Socket::Connection & conn) : TS_BASECLASS(conn){ packCounter=0; - haveAvcc = false; ts_from = 0; setBlocking(true); sendRepeatingHeaders = 0; @@ -48,7 +48,7 @@ namespace Mist { } } - int tmp = packData.fillFree(data, dataLen); + size_t tmp = packData.fillFree(data, dataLen); data += tmp; dataLen -= tmp; } while(dataLen); @@ -67,8 +67,9 @@ namespace Mist { firstPack = true; char * dataPointer = 0; - unsigned int dataLen = 0; - thisPacket.getString("data", dataPointer, dataLen); //data + unsigned int tmpDataLen = 0; + thisPacket.getString("data", dataPointer, tmpDataLen); //data + uint64_t dataLen = tmpDataLen; //apple compatibility timestamp correction if (appleCompat){ packTime -= ts_from; @@ -80,98 +81,106 @@ namespace Mist { std::string bs; //prepare bufferstring if (video){ - unsigned int extraSize = 0; - //dataPointer[4] & 0x1f is used to check if this should be done later: fillPacket("\000\000\000\001\011\360", 6); - if (Trk.codec == "H264" && (dataPointer[4] & 0x1f) != 0x09){ - extraSize += 6; - } - if (keyframe){ - if (Trk.codec == "H264"){ - if (!haveAvcc){ + if (Trk.codec == "H264"){ + unsigned int extraSize = 0; + //dataPointer[4] & 0x1f is used to check if this should be done later: fillPacket("\000\000\000\001\011\360", 6); + if (Trk.codec == "H264" && (dataPointer[4] & 0x1f) != 0x09){ + extraSize += 6; + } + if (keyframe){ + if (Trk.codec == "H264"){ + MP4::AVCC avccbox; avccbox.setPayload(Trk.init); - haveAvcc = true; + bs = avccbox.asAnnexB(); + extraSize += bs.size(); } - bs = avccbox.asAnnexB(); - extraSize += bs.size(); } - } - - unsigned int watKunnenWeIn1Ding = 65490-13; - unsigned int splitCount = (dataLen+extraSize) / watKunnenWeIn1Ding; - unsigned int currPack = 0; - unsigned int ThisNaluSize = 0; - unsigned int i = 0; - unsigned int nalLead = 0; - uint64_t offset = thisPacket.getInt("offset") * 90; + + unsigned int watKunnenWeIn1Ding = 65490-13; + unsigned int splitCount = (dataLen+extraSize) / watKunnenWeIn1Ding; + unsigned int currPack = 0; + uint64_t ThisNaluSize = 0; + unsigned int i = 0; + unsigned int nalLead = 0; + uint64_t offset = thisPacket.getInt("offset") * 90; - while (currPack <= splitCount){ - unsigned int alreadySent = 0; - bs = TS::Packet::getPESVideoLeadIn((currPack != splitCount ? watKunnenWeIn1Ding : dataLen+extraSize - currPack*watKunnenWeIn1Ding), packTime, offset, !currPack, Trk.bps); - fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg); - if (!currPack){ - if (Trk.codec == "H264" && (dataPointer[4] & 0x1f) != 0x09){ - //End of previous nal unit, if not already present - fillPacket("\000\000\000\001\011\360", 6, firstPack, video, keyframe, pkgPid, contPkg); - alreadySent += 6; - } - if (keyframe){ - if (Trk.codec == "H264"){ - bs = avccbox.asAnnexB(); - fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg); - alreadySent += bs.size(); + while (currPack <= splitCount){ + unsigned int alreadySent = 0; + bs = TS::Packet::getPESVideoLeadIn((currPack != splitCount ? watKunnenWeIn1Ding : dataLen+extraSize - currPack*watKunnenWeIn1Ding), packTime, offset, !currPack, Trk.bps); + fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg); + if (!currPack){ + if (Trk.codec == "H264" && (dataPointer[4] & 0x1f) != 0x09){ + //End of previous nal unit, if not already present + fillPacket("\000\000\000\001\011\360", 6, firstPack, video, keyframe, pkgPid, contPkg); + alreadySent += 6; + } + if (keyframe){ + if (Trk.codec == "H264"){ + MP4::AVCC avccbox; + avccbox.setPayload(Trk.init); + bs = avccbox.asAnnexB(); + fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg); + alreadySent += bs.size(); + } } } - } - while (i + 4 < (unsigned int)dataLen){ - if (nalLead){ - fillPacket("\000\000\000\001"+4-nalLead,nalLead, firstPack, video, keyframe, pkgPid, contPkg); - i += nalLead; - alreadySent += nalLead; - nalLead = 0; - } - if (!ThisNaluSize){ - ThisNaluSize = (dataPointer[i] << 24) + (dataPointer[i+1] << 16) + (dataPointer[i+2] << 8) + dataPointer[i+3]; - if (ThisNaluSize + i + 4 > (unsigned int)dataLen){ - DEBUG_MSG(DLVL_WARN, "Too big NALU detected (%u > %d) - skipping!", ThisNaluSize + i + 4, dataLen); - break; + while (i + 4 < (unsigned int)dataLen){ + if (nalLead){ + fillPacket("\000\000\000\001"+4-nalLead,nalLead, firstPack, video, keyframe, pkgPid, contPkg); + i += nalLead; + alreadySent += nalLead; + nalLead = 0; } - if (alreadySent + 4 > watKunnenWeIn1Ding){ - nalLead = 4 - (watKunnenWeIn1Ding-alreadySent); - fillPacket("\000\000\000\001",watKunnenWeIn1Ding-alreadySent, firstPack, video, keyframe, pkgPid, contPkg); + if (!ThisNaluSize){ + ThisNaluSize = Bit::btohl(dataPointer + i); + if (ThisNaluSize + i + 4 > dataLen){ + WARN_MSG("Too big NALU detected (%" PRIu64 " > %" PRIu64 ") - skipping!", ThisNaluSize + i + 4, dataLen); + break; + } + if (alreadySent + 4 > watKunnenWeIn1Ding){ + nalLead = 4 - (watKunnenWeIn1Ding-alreadySent); + fillPacket("\000\000\000\001",watKunnenWeIn1Ding-alreadySent, firstPack, video, keyframe, pkgPid, contPkg); + i += watKunnenWeIn1Ding-alreadySent; + alreadySent += watKunnenWeIn1Ding-alreadySent; + }else{ + fillPacket("\000\000\000\001",4, firstPack, video, keyframe, pkgPid, contPkg); + alreadySent += 4; + i += 4; + } + } + if (alreadySent + ThisNaluSize > watKunnenWeIn1Ding){ + fillPacket(dataPointer+i,watKunnenWeIn1Ding-alreadySent, firstPack, video, keyframe, pkgPid, contPkg); i += watKunnenWeIn1Ding-alreadySent; + ThisNaluSize -= watKunnenWeIn1Ding-alreadySent; alreadySent += watKunnenWeIn1Ding-alreadySent; }else{ - fillPacket("\000\000\000\001",4, firstPack, video, keyframe, pkgPid, contPkg); - alreadySent += 4; - i += 4; + fillPacket(dataPointer+i,ThisNaluSize, firstPack, video, keyframe, pkgPid, contPkg); + alreadySent += ThisNaluSize; + i += ThisNaluSize; + ThisNaluSize = 0; + } + if (alreadySent == watKunnenWeIn1Ding){ + packData.addStuffing(); + fillPacket(0, 0, firstPack, video, keyframe, pkgPid, contPkg); + firstPack = true; + break; } } - if (alreadySent + ThisNaluSize > watKunnenWeIn1Ding){ - fillPacket(dataPointer+i,watKunnenWeIn1Ding-alreadySent, firstPack, video, keyframe, pkgPid, contPkg); - i += watKunnenWeIn1Ding-alreadySent; - ThisNaluSize -= watKunnenWeIn1Ding-alreadySent; - alreadySent += watKunnenWeIn1Ding-alreadySent; - }else{ - fillPacket(dataPointer+i,ThisNaluSize, firstPack, video, keyframe, pkgPid, contPkg); - alreadySent += ThisNaluSize; - i += ThisNaluSize; - ThisNaluSize = 0; - } - if (alreadySent == watKunnenWeIn1Ding){ - packData.addStuffing(); - fillPacket(0, 0, firstPack, video, keyframe, pkgPid, contPkg); - firstPack = true; - break; - } + currPack++; } - currPack++; + }else{ + uint64_t offset = thisPacket.getInt("offset") * 90; + bs = TS::Packet::getPESVideoLeadIn(0, packTime, offset, true, Trk.bps); + fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg); + + fillPacket(dataPointer, dataLen, firstPack, video, keyframe, pkgPid, contPkg); } }else if (Trk.type == "audio"){ long unsigned int tempLen = dataLen; if (Trk.codec == "AAC"){ tempLen += 7; } - bs = TS::Packet::getPESAudioLeadIn(tempLen, packTime, Trk.bps);// myMeta.tracks[thisPacket.getTrackId()].rate / 1000 ); + bs = TS::Packet::getPESAudioLeadIn(tempLen, packTime, Trk.bps); fillPacket(bs.data(), bs.size(), firstPack, video, keyframe, pkgPid, contPkg); if (Trk.codec == "AAC"){ bs = TS::getAudioHeader(dataLen, Trk.init); diff --git a/src/output/output_ts_base.h b/src/output/output_ts_base.h index 37e3a4eb..d1b041ef 100644 --- a/src/output/output_ts_base.h +++ b/src/output/output_ts_base.h @@ -25,8 +25,6 @@ namespace Mist { int contSDT; unsigned int packCounter; ///\todo update constructors? TS::Packet packData; - bool haveAvcc; - MP4::AVCC avccbox; bool appleCompat; uint64_t sendRepeatingHeaders; ///< Amount of ms between PAT/PMT. Zero means do not repeat. uint64_t lastHeaderTime; ///< Timestamp last PAT/PMT were sent.