Improve WebRTC things, fix CPU usage
This commit is contained in:
parent
bdab107011
commit
b103353cf9
6 changed files with 50 additions and 100 deletions
|
@ -19,9 +19,10 @@ namespace Comms{
|
|||
uint8_t defaultCommFlags = 0;
|
||||
|
||||
/// \brief Refreshes the session configuration if the last update was more than 5 seconds ago
|
||||
void sessionConfigCache(){
|
||||
void sessionConfigCache(uint64_t bootMs){
|
||||
static uint64_t lastUpdate = 0;
|
||||
if (Util::bootSecs() > lastUpdate + 5){
|
||||
if (!bootMs){bootMs = Util::bootMS();}
|
||||
if (bootMs > lastUpdate + 5000){
|
||||
VERYHIGH_MSG("Updating session config");
|
||||
JSON::Value tmpVal = Util::getGlobalConfig("sessionViewerMode");
|
||||
if (!tmpVal.isNull()){ sessionViewerMode = tmpVal.asInt(); }
|
||||
|
@ -35,7 +36,7 @@ namespace Comms{
|
|||
if (!tmpVal.isNull()){ sessionStreamInfoMode = tmpVal.asInt(); }
|
||||
tmpVal = Util::getGlobalConfig("tknMode");
|
||||
if (!tmpVal.isNull()){ tknMode = tmpVal.asInt(); }
|
||||
lastUpdate = Util::bootSecs();
|
||||
lastUpdate = bootMs;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ namespace Comms{
|
|||
extern uint8_t sessionStreamInfoMode;
|
||||
extern uint8_t tknMode;
|
||||
extern uint8_t defaultCommFlags;
|
||||
void sessionConfigCache();
|
||||
void sessionConfigCache(uint64_t bootMs = 0);
|
||||
|
||||
class Comms{
|
||||
public:
|
||||
|
|
|
@ -1670,7 +1670,8 @@ namespace Mist{
|
|||
/*LTS-END*/
|
||||
DONTEVEN_MSG("MistOut client handler started");
|
||||
while (keepGoing() && (wantRequest || parseData)){
|
||||
Comms::sessionConfigCache();
|
||||
thisBootMs = Util::bootMS();
|
||||
Comms::sessionConfigCache(thisBootMs);
|
||||
if (wantRequest){requestHandler();}
|
||||
if (parseData){
|
||||
if (!isInitialized){
|
||||
|
@ -2273,7 +2274,7 @@ namespace Mist{
|
|||
if (!isInitialized){return;}
|
||||
// also cancel if it has been less than a second since the last update
|
||||
// unless force is set to true
|
||||
uint64_t now = Util::bootSecs();
|
||||
uint64_t now = thisBootMs / 1000;
|
||||
if (now <= lastStats && !force){return;}
|
||||
|
||||
if (isRecording()){
|
||||
|
|
|
@ -160,6 +160,7 @@ namespace Mist{
|
|||
|
||||
uint64_t firstPacketTime;
|
||||
uint64_t lastPacketTime;
|
||||
uint64_t thisBootMs;
|
||||
|
||||
std::map<size_t, IPC::sharedPage> curPage; ///< For each track, holds the page that is currently being written.
|
||||
};
|
||||
|
|
|
@ -24,17 +24,30 @@ namespace Mist{
|
|||
|
||||
OutWebRTC *classPointer = 0;
|
||||
|
||||
/* ------------------------------------------------ */
|
||||
static void webRTCInputOutputThreadFunc(void *arg){
|
||||
classPointer->handleWebRTCInputOutputFromThread();
|
||||
}
|
||||
|
||||
static void onRTPSorterHasPacketCallback(const uint64_t track, const RTP::Packet &p){
|
||||
classPointer->onRTPSorterHasPacket(track, p);
|
||||
}
|
||||
|
||||
static void onDTSCConverterHasInitDataCallback(const uint64_t track, const std::string &initData){
|
||||
classPointer->onDTSCConverterHasInitData(track, initData);
|
||||
}
|
||||
|
||||
static void onDTSCConverterHasPacketCallback(const DTSC::Packet &pkt){
|
||||
classPointer->onDTSCConverterHasPacket(pkt);
|
||||
}
|
||||
|
||||
static void onRTPPacketizerHasDataCallback(void *socket, const char *data, size_t len, uint8_t channel){
|
||||
classPointer->onRTPPacketizerHasRTPPacket(data, len);
|
||||
}
|
||||
|
||||
static void onRTPPacketizerHasRTCPDataCallback(void *socket, const char *data, size_t len, uint8_t){
|
||||
classPointer->onRTPPacketizerHasRTCPPacket(data, len);
|
||||
}
|
||||
|
||||
static uint32_t generateSSRC();
|
||||
static void webRTCInputOutputThreadFunc(void *arg);
|
||||
static void onDTSCConverterHasPacketCallback(const DTSC::Packet &pkt);
|
||||
static void onDTSCConverterHasInitDataCallback(const uint64_t track, const std::string &initData);
|
||||
static void onRTPSorterHasPacketCallback(const uint64_t track,
|
||||
const RTP::Packet &p); // when we receive RTP packets we store them in a sorter. Whenever there is a valid,
|
||||
// sorted RTP packet that can be used this function is called.
|
||||
static void onRTPPacketizerHasDataCallback(void *socket, const char *data, size_t len, uint8_t channel);
|
||||
static void onRTPPacketizerHasRTCPDataCallback(void *socket, const char *data, size_t nbytes, uint8_t channel);
|
||||
|
||||
#ifdef WITH_DATACHANNELS
|
||||
static int sctp_recv_cb(struct socket *s, union sctp_sockstore addr, void *data, size_t datalen, struct sctp_rcvinfo rcv, int flags, void *ulp_info){
|
||||
|
@ -161,9 +174,9 @@ namespace Mist{
|
|||
lastTimeSync = 0;
|
||||
maxSkipAhead = 0;
|
||||
needsLookAhead = 0;
|
||||
webRTCInputOutputThread = NULL;
|
||||
ioThread = 0;
|
||||
udpPort = 0;
|
||||
SSRC = generateSSRC();
|
||||
Util::getRandomBytes(&SSRC, sizeof(SSRC));
|
||||
rtcpTimeoutInMillis = 0;
|
||||
rtcpKeyFrameDelayInMillis = 2000;
|
||||
rtcpKeyFrameTimeoutInMillis = 0;
|
||||
|
@ -226,11 +239,10 @@ namespace Mist{
|
|||
}
|
||||
|
||||
OutWebRTC::~OutWebRTC(){
|
||||
|
||||
if (webRTCInputOutputThread && webRTCInputOutputThread->joinable()){
|
||||
webRTCInputOutputThread->join();
|
||||
delete webRTCInputOutputThread;
|
||||
webRTCInputOutputThread = NULL;
|
||||
if (ioThread && ioThread->joinable()){
|
||||
ioThread->join();
|
||||
delete ioThread;
|
||||
ioThread = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -402,9 +414,13 @@ namespace Mist{
|
|||
void OutWebRTC::requestHandler(){
|
||||
if (noSignalling){
|
||||
// For WHEP, make sure we keep listening for packets while waiting for new data to come in for sending
|
||||
if (parseData && !handleWebRTCInputOutput()){sendPaced(10);}
|
||||
//After 10s of no packets, abort
|
||||
if (Util::bootMS() > lastRecv + 10000){
|
||||
if (parseData && !handleWebRTCInputOutput()){
|
||||
sendPaced(10);
|
||||
}else{
|
||||
if (ioThread){Util::sleep(500);}
|
||||
}
|
||||
// After 10s of no packets, abort
|
||||
if (thisBootMs > lastRecv + 10000){
|
||||
Util::logExitReason(ER_CLEAN_INACTIVE, "received no data for 10+ seconds");
|
||||
config->is_active = false;
|
||||
}
|
||||
|
@ -922,8 +938,8 @@ namespace Mist{
|
|||
|
||||
if (!meta.getBootMsOffset()){meta.setBootMsOffset(Util::bootMS());}
|
||||
|
||||
if (webRTCInputOutputThread != NULL){
|
||||
FAIL_MSG("It seems that we're already have a webrtc i/o thread running.");
|
||||
if (ioThread){
|
||||
FAIL_MSG("It seems that we're already have a WebRTC I/O thread running, aborting input request");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1038,7 +1054,7 @@ namespace Mist{
|
|||
// start our receive thread (handles STUN, DTLS, RTP input)
|
||||
rtcpTimeoutInMillis = Util::bootMS() + 2000;
|
||||
rtcpKeyFrameTimeoutInMillis = Util::bootMS() + 2000;
|
||||
webRTCInputOutputThread = new tthread::thread(webRTCInputOutputThreadFunc, NULL);
|
||||
ioThread = new tthread::thread(webRTCInputOutputThreadFunc, NULL);
|
||||
|
||||
idleInterval = 1000;
|
||||
|
||||
|
@ -1105,9 +1121,7 @@ namespace Mist{
|
|||
|
||||
/* ------------------------------------------------ */
|
||||
|
||||
// This function is called from the `webRTCInputOutputThreadFunc()`
|
||||
// function. The `webRTCInputOutputThreadFunc()` is basically empty
|
||||
// and all work for the thread is done here.
|
||||
/// Worker function for ioThread
|
||||
void OutWebRTC::handleWebRTCInputOutputFromThread(){
|
||||
while (keepGoing()){
|
||||
if (!handleWebRTCInputOutput()){sendPaced(10);}
|
||||
|
@ -2303,71 +2317,4 @@ namespace Mist{
|
|||
}
|
||||
}
|
||||
|
||||
/* ------------------------------------------------ */
|
||||
|
||||
// This is our thread function that is started right before we
|
||||
// call `allowPush()` and send our answer SDP back to the
|
||||
// client.
|
||||
static void webRTCInputOutputThreadFunc(void *arg){
|
||||
if (!classPointer){
|
||||
FAIL_MSG("classPointer hasn't been set. Exiting thread.");
|
||||
return;
|
||||
}
|
||||
classPointer->handleWebRTCInputOutputFromThread();
|
||||
}
|
||||
|
||||
static void onRTPSorterHasPacketCallback(const uint64_t track, const RTP::Packet &p){
|
||||
if (!classPointer){
|
||||
FAIL_MSG("We received a sorted RTP packet but our `classPointer` is invalid.");
|
||||
return;
|
||||
}
|
||||
classPointer->onRTPSorterHasPacket(track, p);
|
||||
}
|
||||
|
||||
static void onDTSCConverterHasInitDataCallback(const uint64_t track, const std::string &initData){
|
||||
if (!classPointer){
|
||||
FAIL_MSG("Received a init data, but our `classPointer` is invalid.");
|
||||
return;
|
||||
}
|
||||
classPointer->onDTSCConverterHasInitData(track, initData);
|
||||
}
|
||||
|
||||
static void onDTSCConverterHasPacketCallback(const DTSC::Packet &pkt){
|
||||
if (!classPointer){
|
||||
FAIL_MSG("Received a DTSC packet that was created from RTP data, but our `classPointer` is "
|
||||
"invalid.");
|
||||
return;
|
||||
}
|
||||
classPointer->onDTSCConverterHasPacket(pkt);
|
||||
}
|
||||
|
||||
static void onRTPPacketizerHasDataCallback(void *socket, const char *data, size_t len, uint8_t channel){
|
||||
if (!classPointer){
|
||||
FAIL_MSG("Received a RTP packet but our `classPointer` is invalid.");
|
||||
return;
|
||||
}
|
||||
classPointer->onRTPPacketizerHasRTPPacket(data, len);
|
||||
}
|
||||
|
||||
static void onRTPPacketizerHasRTCPDataCallback(void *socket, const char *data, size_t len, uint8_t){
|
||||
if (!classPointer){
|
||||
FAIL_MSG("Received a RTCP packet, but out `classPointer` is invalid.");
|
||||
return;
|
||||
}
|
||||
classPointer->onRTPPacketizerHasRTCPPacket(data, len);
|
||||
}
|
||||
|
||||
static uint32_t generateSSRC(){
|
||||
|
||||
uint32_t ssrc = 0;
|
||||
|
||||
do{
|
||||
ssrc = rand();
|
||||
ssrc = ssrc << 16;
|
||||
ssrc += rand();
|
||||
}while (ssrc == 0 || ssrc == 0xffffffff);
|
||||
|
||||
return ssrc;
|
||||
}
|
||||
|
||||
}// namespace Mist
|
||||
|
|
|
@ -163,7 +163,7 @@ namespace Mist{
|
|||
///< messages to which we need to reply.
|
||||
std::map<uint64_t, WebRTCTrack> webrtcTracks; ///< WebRTCTracks indexed by payload type for incoming data and indexed by
|
||||
///< myMeta.tracks[].trackID for outgoing data.
|
||||
tthread::thread *webRTCInputOutputThread; ///< The thread in which we read WebRTC data when
|
||||
tthread::thread *ioThread; ///< The thread in which we read WebRTC data when
|
||||
///< we're receive media from another peer.
|
||||
uint32_t SSRC; ///< The SSRC for this local instance. Is used when generating RTCP reports. */
|
||||
uint64_t rtcpTimeoutInMillis; ///< When current time in millis exceeds this timeout we have to
|
||||
|
|
Loading…
Add table
Reference in a new issue