WHIP/WISH/WHEP support

This commit is contained in:
Thulinma 2023-03-07 13:25:57 +01:00
parent 4e69250e69
commit 3b3a00d7bd
2 changed files with 156 additions and 8 deletions

View file

@ -51,6 +51,7 @@ namespace Mist{
/* ------------------------------------------------ */
OutWebRTC::OutWebRTC(Socket::Connection &myConn) : HTTPOutput(myConn){
noSignalling = false;
totalPkts = 0;
totalLoss = 0;
totalRetrans = 0;
@ -303,6 +304,147 @@ namespace Mist{
}
}
void OutWebRTC::requestHandler(){
if (noSignalling){
if (!parseData){Util::sleep(500);}
//After 10s of no packets, abort
if (Util::bootMS() > lastRecv + 10000){
Util::logExitReason("received no data for 10+ seconds");
config->is_active = false;
}
return;
}
HTTPOutput::requestHandler();
}
void OutWebRTC::respondHTTP(const HTTP::Parser & req, bool headersOnly){
// Check for WHIP payload
if (req.method == "OPTIONS"){
H.setCORSHeaders();
H.StartResponse("200", "All good", req, myConn);
H.Chunkify(0, 0, myConn);
}
if (req.method == "POST"){
if (req.GetHeader("Content-Type") == "application/sdp"){
SDP::Session sdpParser;
const std::string &offerStr = req.body;
if (packetLog.is_open()){
packetLog << "[" << Util::bootMS() << "]" << offerStr << std::endl << std::endl;
}
if (!sdpParser.parseSDP(offerStr) || !sdpAnswer.parseOffer(offerStr)){
H.setCORSHeaders();
H.StartResponse("400", "Could not parse", req, myConn);
H.Chunkify("Failed to parse offer SDP", myConn);
H.Chunkify(0, 0, myConn);
return;
}
bool ret = false;
if (sdpParser.hasSendOnlyMedia()){
ret = handleSignalingCommandRemoteOfferForInput(sdpParser);
}else{
ret = handleSignalingCommandRemoteOfferForOutput(sdpParser);
}
if (ret){
noSignalling = true;
H.SetHeader("Content-Type", "application/sdp");
H.SetHeader("Location", streamName + "/" + JSON::Value(getpid()).asString());
if (config->getString("iceservers").size()){
std::deque<std::string> links;
JSON::Value iceConf = JSON::fromString(config->getString("iceservers"));
jsonForEach(iceConf, i){
if (i->isMember("url") && (*i)["url"].isString()){
JSON::Value &u = (*i)["url"];
std::string str = u.asString()+"; rel=\"ice-server\";";
if (i->isMember("username")){
str += " username=" + (*i)["username"].toString() + ";";
}
if (i->isMember("credential")){
str += " credential=" + (*i)["credential"].toString() + ";";
}
if (i->isMember("credentialType")){
str += " credential-type=" + (*i)["credentialType"].toString() + ";";
}
links.push_back(str);
}
if (i->isMember("urls") && (*i)["urls"].isString()){
JSON::Value &u = (*i)["urls"];
std::string str = u.asString()+"; rel=\"ice-server\";";
if (i->isMember("username")){
str += " username=" + (*i)["username"].toString() + ";";
}
if (i->isMember("credential")){
str += " credential=" + (*i)["credential"].toString() + ";";
}
if (i->isMember("credentialType")){
str += " credential-type=" + (*i)["credentialType"].toString() + ";";
}
links.push_back(str);
}
if (i->isMember("urls") && (*i)["urls"].isArray()){
jsonForEach((*i)["urls"], j){
JSON::Value &u = *j;
std::string str = u.asString()+"; rel=\"ice-server\";";
if (i->isMember("username")){
str += " username=" + (*i)["username"].toString() + ";";
}
if (i->isMember("credential")){
str += " credential=" + (*i)["credential"].toString() + ";";
}
if (i->isMember("credentialType")){
str += " credential-type=" + (*i)["credentialType"].toString() + ";";
}
links.push_back(str);
}
}
}
if (links.size()){
if (links.size() == 1){
H.SetHeader("Link", *links.begin());
}else{
std::deque<std::string>::iterator it = links.begin();
std::string linkHeader = *it;
++it;
while (it != links.end()){
linkHeader += "\r\nLink: " + *it;
++it;
}
H.SetHeader("Link", linkHeader);
}
}
}
H.setCORSHeaders();
H.StartResponse("201", "Created", req, myConn);
H.Chunkify(sdpAnswer.toString(), myConn);
H.Chunkify(0, 0, myConn);
myConn.close();
return;
}else{
H.setCORSHeaders();
H.StartResponse("403", "Not allowed", req, myConn);
H.Chunkify("Request not allowed", myConn);
H.Chunkify(0, 0, myConn);
return;
}
}
}
// We don't implement PATCH requests
if (req.method == "PATCH"){
H.setCORSHeaders();
H.StartResponse("405", "PATCH not supported", req, myConn);
H.Chunkify("This endpoint only supports WHIP/WHEP/WISH POST requests or WebSocket connections", myConn);
H.Chunkify(0, 0, myConn);
return;
}
//Generic response handler
H.setCORSHeaders();
H.StartResponse("405", "Must POST or use websocket", req, myConn);
H.Chunkify("This endpoint only supports WHIP/WHEP/WISH POST requests or WebSocket connections", myConn);
H.Chunkify(0, 0, myConn);
}
// This function is executed when we receive a signaling data.
// The signaling data contains commands that are used to start
// an input or output stream.
@ -604,11 +746,13 @@ namespace Mist{
}
bool OutWebRTC::dropPushTrack(uint32_t trackId, const std::string & dropReason){
JSON::Value commandResult;
commandResult["type"] = "on_track_drop";
commandResult["track"] = trackId;
commandResult["mediatype"] = M.getType(trackId);
webSock->sendFrame(commandResult.toString());
if (!noSignalling){
JSON::Value commandResult;
commandResult["type"] = "on_track_drop";
commandResult["track"] = trackId;
commandResult["mediatype"] = M.getType(trackId);
webSock->sendFrame(commandResult.toString());
}
return Output::dropPushTrack(trackId, dropReason);
}
@ -940,9 +1084,9 @@ namespace Mist{
sdpAnswer.setDirection("recvonly");
// start our receive thread (handles STUN, DTLS, RTP input)
webRTCInputOutputThread = new tthread::thread(webRTCInputOutputThreadFunc, NULL);
rtcpTimeoutInMillis = Util::bootMS() + 2000;
rtcpKeyFrameTimeoutInMillis = Util::bootMS() + 2000;
webRTCInputOutputThread = new tthread::thread(webRTCInputOutputThreadFunc, NULL);
idleInterval = 1000;
@ -1903,7 +2047,7 @@ namespace Mist{
//Do not reduce under 32 kbps
if (videoConstraint < 1024*32){videoConstraint = 1024*32;}
if (videoConstraint != preConstraint){
if (!noSignalling && videoConstraint != preConstraint){
INFO_MSG("Reduced video bandwidth maximum to %" PRIu32 " because average loss is %.2f", videoConstraint, curr_avg_loss);
JSON::Value commandResult;
commandResult["type"] = "on_video_bitrate";

View file

@ -132,6 +132,8 @@ namespace Mist{
virtual void sendHeader();
virtual void sendNext();
virtual void onWebsocketFrame();
virtual void respondHTTP(const HTTP::Parser & req, bool headersOnly);
virtual void preHTTP(){}
virtual void preWebsocketConnect();
virtual bool dropPushTrack(uint32_t trackId, const std::string & dropReason);
void onIdle();
@ -145,8 +147,10 @@ namespace Mist{
void onRTPPacketizerHasRTPPacket(const char *data, size_t nbytes);
void onRTPPacketizerHasRTCPPacket(const char *data, uint32_t nbytes);
virtual void connStats(uint64_t now, Comms::Connections &statComm);
inline virtual bool keepGoing(){return config->is_active && (noSignalling || myConn);}
virtual void requestHandler();
private:
bool noSignalling;
uint64_t lastRecv;
uint64_t lastPackMs;
uint64_t totalPkts;