diff --git a/lib/comms.cpp b/lib/comms.cpp index b0eb028d..33564268 100644 --- a/lib/comms.cpp +++ b/lib/comms.cpp @@ -19,9 +19,10 @@ namespace Comms{ uint8_t defaultCommFlags = 0; /// \brief Refreshes the session configuration if the last update was more than 5 seconds ago - void sessionConfigCache(){ + void sessionConfigCache(uint64_t bootMs){ static uint64_t lastUpdate = 0; - if (Util::bootSecs() > lastUpdate + 5){ + if (!bootMs){bootMs = Util::bootMS();} + if (bootMs > lastUpdate + 5000){ VERYHIGH_MSG("Updating session config"); JSON::Value tmpVal = Util::getGlobalConfig("sessionViewerMode"); if (!tmpVal.isNull()){ sessionViewerMode = tmpVal.asInt(); } @@ -35,7 +36,7 @@ namespace Comms{ if (!tmpVal.isNull()){ sessionStreamInfoMode = tmpVal.asInt(); } tmpVal = Util::getGlobalConfig("tknMode"); if (!tmpVal.isNull()){ tknMode = tmpVal.asInt(); } - lastUpdate = Util::bootSecs(); + lastUpdate = bootMs; } } diff --git a/lib/comms.h b/lib/comms.h index 491bda9b..06d4a60a 100644 --- a/lib/comms.h +++ b/lib/comms.h @@ -27,7 +27,7 @@ namespace Comms{ extern uint8_t sessionStreamInfoMode; extern uint8_t tknMode; extern uint8_t defaultCommFlags; - void sessionConfigCache(); + void sessionConfigCache(uint64_t bootMs = 0); class Comms{ public: diff --git a/src/output/output.cpp b/src/output/output.cpp index 6ad37cd8..8d64076d 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -1670,7 +1670,8 @@ namespace Mist{ /*LTS-END*/ DONTEVEN_MSG("MistOut client handler started"); while (keepGoing() && (wantRequest || parseData)){ - Comms::sessionConfigCache(); + thisBootMs = Util::bootMS(); + Comms::sessionConfigCache(thisBootMs); if (wantRequest){requestHandler();} if (parseData){ if (!isInitialized){ @@ -2273,7 +2274,7 @@ namespace Mist{ if (!isInitialized){return;} // also cancel if it has been less than a second since the last update // unless force is set to true - uint64_t now = Util::bootSecs(); + uint64_t now = thisBootMs / 1000; if (now <= lastStats && !force){return;} if (isRecording()){ diff --git a/src/output/output.h b/src/output/output.h index 24637194..ee1555a5 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -160,6 +160,7 @@ namespace Mist{ uint64_t firstPacketTime; uint64_t lastPacketTime; + uint64_t thisBootMs; std::map curPage; ///< For each track, holds the page that is currently being written. }; diff --git a/src/output/output_webrtc.cpp b/src/output/output_webrtc.cpp index c02fcb83..5f4b5db3 100644 --- a/src/output/output_webrtc.cpp +++ b/src/output/output_webrtc.cpp @@ -24,17 +24,30 @@ namespace Mist{ OutWebRTC *classPointer = 0; - /* ------------------------------------------------ */ + static void webRTCInputOutputThreadFunc(void *arg){ + classPointer->handleWebRTCInputOutputFromThread(); + } + + static void onRTPSorterHasPacketCallback(const uint64_t track, const RTP::Packet &p){ + classPointer->onRTPSorterHasPacket(track, p); + } + + static void onDTSCConverterHasInitDataCallback(const uint64_t track, const std::string &initData){ + classPointer->onDTSCConverterHasInitData(track, initData); + } + + static void onDTSCConverterHasPacketCallback(const DTSC::Packet &pkt){ + classPointer->onDTSCConverterHasPacket(pkt); + } + + static void onRTPPacketizerHasDataCallback(void *socket, const char *data, size_t len, uint8_t channel){ + classPointer->onRTPPacketizerHasRTPPacket(data, len); + } + + static void onRTPPacketizerHasRTCPDataCallback(void *socket, const char *data, size_t len, uint8_t){ + classPointer->onRTPPacketizerHasRTCPPacket(data, len); + } - static uint32_t generateSSRC(); - static void webRTCInputOutputThreadFunc(void *arg); - static void onDTSCConverterHasPacketCallback(const DTSC::Packet &pkt); - static void onDTSCConverterHasInitDataCallback(const uint64_t track, const std::string &initData); - static void onRTPSorterHasPacketCallback(const uint64_t track, - const RTP::Packet &p); // when we receive RTP packets we store them in a sorter. Whenever there is a valid, - // sorted RTP packet that can be used this function is called. - static void onRTPPacketizerHasDataCallback(void *socket, const char *data, size_t len, uint8_t channel); - static void onRTPPacketizerHasRTCPDataCallback(void *socket, const char *data, size_t nbytes, uint8_t channel); #ifdef WITH_DATACHANNELS static int sctp_recv_cb(struct socket *s, union sctp_sockstore addr, void *data, size_t datalen, struct sctp_rcvinfo rcv, int flags, void *ulp_info){ @@ -161,9 +174,9 @@ namespace Mist{ lastTimeSync = 0; maxSkipAhead = 0; needsLookAhead = 0; - webRTCInputOutputThread = NULL; + ioThread = 0; udpPort = 0; - SSRC = generateSSRC(); + Util::getRandomBytes(&SSRC, sizeof(SSRC)); rtcpTimeoutInMillis = 0; rtcpKeyFrameDelayInMillis = 2000; rtcpKeyFrameTimeoutInMillis = 0; @@ -226,11 +239,10 @@ namespace Mist{ } OutWebRTC::~OutWebRTC(){ - - if (webRTCInputOutputThread && webRTCInputOutputThread->joinable()){ - webRTCInputOutputThread->join(); - delete webRTCInputOutputThread; - webRTCInputOutputThread = NULL; + if (ioThread && ioThread->joinable()){ + ioThread->join(); + delete ioThread; + ioThread = 0; } } @@ -402,9 +414,13 @@ namespace Mist{ void OutWebRTC::requestHandler(){ if (noSignalling){ // For WHEP, make sure we keep listening for packets while waiting for new data to come in for sending - if (parseData && !handleWebRTCInputOutput()){sendPaced(10);} - //After 10s of no packets, abort - if (Util::bootMS() > lastRecv + 10000){ + if (parseData && !handleWebRTCInputOutput()){ + sendPaced(10); + }else{ + if (ioThread){Util::sleep(500);} + } + // After 10s of no packets, abort + if (thisBootMs > lastRecv + 10000){ Util::logExitReason(ER_CLEAN_INACTIVE, "received no data for 10+ seconds"); config->is_active = false; } @@ -922,8 +938,8 @@ namespace Mist{ if (!meta.getBootMsOffset()){meta.setBootMsOffset(Util::bootMS());} - if (webRTCInputOutputThread != NULL){ - FAIL_MSG("It seems that we're already have a webrtc i/o thread running."); + if (ioThread){ + FAIL_MSG("It seems that we're already have a WebRTC I/O thread running, aborting input request"); return false; } @@ -1038,7 +1054,7 @@ namespace Mist{ // start our receive thread (handles STUN, DTLS, RTP input) rtcpTimeoutInMillis = Util::bootMS() + 2000; rtcpKeyFrameTimeoutInMillis = Util::bootMS() + 2000; - webRTCInputOutputThread = new tthread::thread(webRTCInputOutputThreadFunc, NULL); + ioThread = new tthread::thread(webRTCInputOutputThreadFunc, NULL); idleInterval = 1000; @@ -1105,9 +1121,7 @@ namespace Mist{ /* ------------------------------------------------ */ - // This function is called from the `webRTCInputOutputThreadFunc()` - // function. The `webRTCInputOutputThreadFunc()` is basically empty - // and all work for the thread is done here. + /// Worker function for ioThread void OutWebRTC::handleWebRTCInputOutputFromThread(){ while (keepGoing()){ if (!handleWebRTCInputOutput()){sendPaced(10);} @@ -2303,71 +2317,4 @@ namespace Mist{ } } - /* ------------------------------------------------ */ - - // This is our thread function that is started right before we - // call `allowPush()` and send our answer SDP back to the - // client. - static void webRTCInputOutputThreadFunc(void *arg){ - if (!classPointer){ - FAIL_MSG("classPointer hasn't been set. Exiting thread."); - return; - } - classPointer->handleWebRTCInputOutputFromThread(); - } - - static void onRTPSorterHasPacketCallback(const uint64_t track, const RTP::Packet &p){ - if (!classPointer){ - FAIL_MSG("We received a sorted RTP packet but our `classPointer` is invalid."); - return; - } - classPointer->onRTPSorterHasPacket(track, p); - } - - static void onDTSCConverterHasInitDataCallback(const uint64_t track, const std::string &initData){ - if (!classPointer){ - FAIL_MSG("Received a init data, but our `classPointer` is invalid."); - return; - } - classPointer->onDTSCConverterHasInitData(track, initData); - } - - static void onDTSCConverterHasPacketCallback(const DTSC::Packet &pkt){ - if (!classPointer){ - FAIL_MSG("Received a DTSC packet that was created from RTP data, but our `classPointer` is " - "invalid."); - return; - } - classPointer->onDTSCConverterHasPacket(pkt); - } - - static void onRTPPacketizerHasDataCallback(void *socket, const char *data, size_t len, uint8_t channel){ - if (!classPointer){ - FAIL_MSG("Received a RTP packet but our `classPointer` is invalid."); - return; - } - classPointer->onRTPPacketizerHasRTPPacket(data, len); - } - - static void onRTPPacketizerHasRTCPDataCallback(void *socket, const char *data, size_t len, uint8_t){ - if (!classPointer){ - FAIL_MSG("Received a RTCP packet, but out `classPointer` is invalid."); - return; - } - classPointer->onRTPPacketizerHasRTCPPacket(data, len); - } - - static uint32_t generateSSRC(){ - - uint32_t ssrc = 0; - - do{ - ssrc = rand(); - ssrc = ssrc << 16; - ssrc += rand(); - }while (ssrc == 0 || ssrc == 0xffffffff); - - return ssrc; - } - }// namespace Mist diff --git a/src/output/output_webrtc.h b/src/output/output_webrtc.h index 7f587315..03eea959 100644 --- a/src/output/output_webrtc.h +++ b/src/output/output_webrtc.h @@ -163,7 +163,7 @@ namespace Mist{ ///< messages to which we need to reply. std::map webrtcTracks; ///< WebRTCTracks indexed by payload type for incoming data and indexed by ///< myMeta.tracks[].trackID for outgoing data. - tthread::thread *webRTCInputOutputThread; ///< The thread in which we read WebRTC data when + tthread::thread *ioThread; ///< The thread in which we read WebRTC data when ///< we're receive media from another peer. uint32_t SSRC; ///< The SSRC for this local instance. Is used when generating RTCP reports. */ uint64_t rtcpTimeoutInMillis; ///< When current time in millis exceeds this timeout we have to