Support for WebRTC data tracks (output only, for now), rewrite of dTLS integration (now part of socket lib), support for multi-path WebRTC connections

This commit is contained in:
Thulinma 2023-12-29 00:58:03 +01:00
parent 56193f89b1
commit 3987cfec3f
16 changed files with 1303 additions and 811 deletions

View file

@ -91,6 +91,7 @@ foreach output : outputs
endif
if extra.contains('srtp')
sources += files('output_webrtc_srtp.cpp', 'output_webrtc_srtp.h')
deps += usrsctp_dep
endif
if extra.contains('embed')
sources += embed_tgts

View file

@ -735,18 +735,22 @@ namespace Mist{
std::set<size_t> validTracks = M.getValidTracks();
if (!validTracks.size()){return 0;}
uint64_t start = 0xFFFFFFFFFFFFFFFFull;
uint64_t nonMetaStart = 0xFFFFFFFFFFFFFFFFull;
if (userSelect.size()){
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
if (M.trackValid(it->first) && start > M.getFirstms(it->first)){
start = M.getFirstms(it->first);
}
if (M.trackValid(it->first) && M.getType(it->first) != "meta" && nonMetaStart > M.getFirstms(it->first)){
nonMetaStart = M.getFirstms(it->first);
}
}
}else{
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); it++){
if (start > M.getFirstms(*it)){start = M.getFirstms(*it);}
}
}
return start;
return nonMetaStart != 0xFFFFFFFFFFFFFFFFull ? nonMetaStart: start;
}
/// Return the end time of the selected tracks, or 0 if unknown or live.

File diff suppressed because it is too large Load diff

View file

@ -3,7 +3,6 @@
#include "output.h"
#include "output_http.h"
#include <mist/certificate.h>
#include <mist/dtls_srtp_handshake.h>
#include <mist/h264.h>
#include <mist/http_parser.h>
#include <mist/rtp_fec.h>
@ -14,6 +13,7 @@
#include <mist/websocket.h>
#include <fstream>
#include "output_webrtc_srtp.h"
#include <usrsctp.h>
#define NACK_BUFFER_SIZE 1024
@ -67,7 +67,19 @@ namespace Mist{
double jitter;
};
/* ------------------------------------------------ */
class WebRTCSocket{
public:
WebRTCSocket();
Socket::UDPConnection* udpSock;
SRTPReader srtpReader; ///< Used to unprotect incoming RTP and RTCP data. Uses the keys that
///< were exchanged with DTLS.
SRTPWriter srtpWriter; ///< Used to protect our RTP and RTCP data when sending data to another
///< peer. Uses the keys that were exchanged with DTLS.
std::map<uint32_t, nackBuffer> outBuffers;
size_t sendRTCP(const char * data, size_t len);
size_t ackNACK(uint32_t pSSRC, uint16_t seq);
Util::ResizeablePointer dataBuffer;
};
class OutWebRTC : public HTTPOutput{
public:
@ -82,10 +94,13 @@ namespace Mist{
virtual bool dropPushTrack(uint32_t trackId, const std::string & dropReason);
void handleWebsocketIdle();
virtual void onFail(const std::string &msg, bool critical = false);
bool onFinish();
bool doesWebsockets(){return true;}
void handleWebRTCInputOutputFromThread();
int onDTLSHandshakeWantsToWrite(const uint8_t *data, int *nbytes);
bool handleUDPSocket(Socket::UDPConnection & sock);
bool handleUDPSocket(WebRTCSocket & wSock);
void sendSCTPPacket(const char * data, size_t len);
void sendPaced(uint64_t uSendWindow);
void onSCTP(const char * data, size_t len, uint16_t stream, uint32_t ppid);
void onRTPSorterHasPacket(size_t tid, const RTP::Packet &pkt);
void onDTSCConverterHasPacket(const DTSC::Packet &pkt);
void onDTSCConverterHasInitData(const size_t trackID, const std::string &initData);
@ -95,7 +110,7 @@ namespace Mist{
inline virtual bool keepGoing(){return config->is_active && (noSignalling || myConn);}
virtual void requestHandler();
protected:
virtual void idleTime(uint64_t ms){udp.sendPaced(ms*1000);}
virtual void idleTime(uint64_t ms){sendPaced(ms*1000);}
private:
bool noSignalling;
uint64_t lastRecv;
@ -109,9 +124,8 @@ namespace Mist{
void ackNACK(uint32_t SSRC, uint16_t seq);
bool handleWebRTCInputOutput(); ///< Reads data from the UDP socket. Returns true when we read
///< some data, othewise false.
void handleReceivedSTUNPacket();
void handleReceivedDTLSPacket();
void handleReceivedRTPOrRTCPPacket();
void handleReceivedSTUNPacket(WebRTCSocket &wSock);
void handleReceivedRTPOrRTCPPacket(WebRTCSocket &wSock);
bool handleSignalingCommandRemoteOfferForInput(SDP::Session &sdpSession);
bool handleSignalingCommandRemoteOfferForOutput(SDP::Session &sdpSession);
void sendSignalingError(const std::string &commandType, const std::string &errorMessage);
@ -136,20 +150,19 @@ namespace Mist{
SDP::Session sdp; ///< SDP parser.
SDP::Answer sdpAnswer; ///< WIP: Replacing our `sdp` member ..
Certificate cert; ///< The TLS certificate. Used to generate a fingerprint in SDP answers.
DTLSSRTPHandshake dtlsHandshake; ///< Implements the DTLS handshake using the mbedtls library (fork).
SRTPReader srtpReader; ///< Used to unprotect incoming RTP and RTCP data. Uses the keys that
///< were exchanged with DTLS.
SRTPWriter srtpWriter; ///< Used to protect our RTP and RTCP data when sending data to another
///< peer. Uses the keys that were exchanged with DTLS.
Socket::UDPConnection udp; ///< Our UDP socket over which WebRTC data is received and sent.
Socket::UDPConnection mainSocket; //< Main socket created during the initial handshake
std::map<int, WebRTCSocket> sockets; ///< UDP sockets over which WebRTC data is received and sent.
std::set<int> rtpSockets; ///< UDP sockets over which (S)RTP data is transmitted/received
std::set<int> sctpSockets; ///< UDP sockets over which (S)RTP data is transmitted/received
uint16_t lastMediaSocket; //< Last socket number we received video/audio on
uint16_t lastMetaSocket; //< Last socket number we received non-media data on
uint16_t udpPort; ///< Port where we receive RTP, STUN, DTLS, etc.
StunReader stunReader; ///< Decodes STUN messages; during a session we keep receiving STUN
///< messages to which we need to reply.
std::map<uint64_t, WebRTCTrack> webrtcTracks; ///< WebRTCTracks indexed by payload type for incoming data and indexed by
///< myMeta.tracks[].trackID for outgoing data.
tthread::thread *webRTCInputOutputThread; ///< The thread in which we read WebRTC data when
///< we're receive media from another peer.
uint16_t udpPort; ///< The port on which our webrtc socket is bound. This is where we receive
///< RTP, STUN, DTLS, etc. */
uint32_t SSRC; ///< The SSRC for this local instance. Is used when generating RTCP reports. */
uint64_t rtcpTimeoutInMillis; ///< When current time in millis exceeds this timeout we have to
///< send a new RTCP packet.
@ -161,18 +174,15 @@ namespace Mist{
///< the signaling channel. Defaults to 6mbit.
uint32_t videoConstraint;
size_t audTrack, vidTrack, prevVidTrack;
size_t audTrack, vidTrack, prevVidTrack, metaTrack;
double target_rate; ///< Target playback speed rate (1.0 = normal, 0 = auto)
bool didReceiveKeyFrame; /* TODO burst delay */
bool didReceiveKeyFrame;
bool setPacketOffset;
int64_t packetOffset; ///< For timestamp rewrite with BMO
uint64_t lastTimeSync;
bool firstKey;
bool repeatInit;
bool stayLive;
bool doDTLS;
bool volkswagenMode;
double stats_jitter;
uint64_t stats_nacknum;
@ -191,13 +201,17 @@ namespace Mist{
std::map<uint8_t, uint64_t> payloadTypeToWebRTCTrack; ///< Maps e.g. RED to the corresponding track. Used when input
///< supports RED/ULPFEC; can also be used to map RTX in the
///< future.
std::map<uint32_t, nackBuffer> outBuffers;
uint64_t lastSR;
std::set<size_t> mustSendSR;
int64_t ntpClockDifference;
bool syncedNTPClock;
bool sctpInited;
bool sctpConnected;
struct socket * sctp_sock;
std::map<std::string, uint16_t> dataChannels;
std::deque<std::string> queuedJSON;
};
}// namespace Mist

View file

@ -14,6 +14,10 @@ SRTPReader::SRTPReader(){
memset((void *)&policy, 0x00, sizeof(policy));
}
SRTPReader::~SRTPReader(){
if (shutdown() != 0){FAIL_MSG("Failed to cleanly shutdown the srtp reader.");}
}
/*
Before initializing the srtp library we shut it down first
because initializing the library twice results in an error.
@ -203,6 +207,11 @@ SRTPWriter::SRTPWriter(){
memset((void *)&policy, 0x00, sizeof(policy));
}
SRTPWriter::~SRTPWriter(){
if (shutdown() != 0){FAIL_MSG("Failed to cleanly shutdown the srtp writer.");}
}
/*
Before initializing the srtp library we shut it down first
because initializing the library twice results in an error.

View file

@ -14,6 +14,7 @@
class SRTPReader{
public:
SRTPReader();
~SRTPReader();
int init(const std::string &cipher, const std::string &key, const std::string &salt);
int shutdown();
int unprotectRtp(uint8_t *data, int *nbytes); /* `nbytes` should contain the number of bytes in `data`. On success `nbytes`
@ -32,6 +33,7 @@ private:
class SRTPWriter{
public:
SRTPWriter();
~SRTPWriter();
int init(const std::string &cipher, const std::string &key, const std::string &salt);
int shutdown();
int protectRtp(uint8_t *data, int *nbytes);

View file

@ -67,23 +67,23 @@ void userOnActive(Comms::Connections &connections, size_t idx){
}
// Sanity checks
if (connections.getDown(idx) < connDown[idx]){
WARN_MSG("Connection downloaded bytes should be a counter, but has decreased in value");
MEDIUM_MSG("Connection downloaded bytes should be a counter, but has decreased in value");
connDown[idx] = connections.getDown(idx);
}
if (connections.getUp(idx) < connUp[idx]){
WARN_MSG("Connection uploaded bytes should be a counter, but has decreased in value");
MEDIUM_MSG("Connection uploaded bytes should be a counter, but has decreased in value");
connUp[idx] = connections.getUp(idx);
}
if (connections.getPacketCount(idx) < connPktcount[idx]){
WARN_MSG("Connection packet count should be a counter, but has decreased in value");
MEDIUM_MSG("Connection packet count should be a counter, but has decreased in value");
connPktcount[idx] = connections.getPacketCount(idx);
}
if (connections.getPacketLostCount(idx) < connPktloss[idx]){
WARN_MSG("Connection packet loss count should be a counter, but has decreased in value");
MEDIUM_MSG("Connection packet loss count should be a counter, but has decreased in value");
connPktloss[idx] = connections.getPacketLostCount(idx);
}
if (connections.getPacketRetransmitCount(idx) < connPktretrans[idx]){
WARN_MSG("Connection packets retransmitted should be a counter, but has decreased in value");
MEDIUM_MSG("Connection packets retransmitted should be a counter, but has decreased in value");
connPktretrans[idx] = connections.getPacketRetransmitCount(idx);
}
// Add increase in stats to global stats
@ -218,7 +218,7 @@ int main(int argc, char **argv){
return 0;
}
}
// Claim a spot in shared memory for this session on the global statistics page
sessions.reload();
if (!sessions){