Socket library and Config library restructuring, improvement to UDP socket reliability

This commit is contained in:
Thulinma 2020-06-25 23:34:26 +02:00
parent 97752f2c2d
commit 3d26741148
37 changed files with 151 additions and 110 deletions

View file

@ -265,7 +265,7 @@ int main_loop(int argc, char **argv){
}
if (Controller::Storage.isMember("config") && Controller::Storage["config"].isMember("debug") &&
Controller::Storage["config"]["debug"].isInt()){
Util::Config::printDebugLevel = Controller::Storage["config"]["debug"].asInt();
Util::printDebugLevel = Controller::Storage["config"]["debug"].asInt();
}
// check for port, interface and username in arguments
// if they are not there, take them from config file, if there

View file

@ -443,8 +443,8 @@ void Controller::handleUDPAPI(void *np){
uSock.SetDestination(UDP_API_HOST, UDP_API_PORT);
while (Controller::conf.is_active){
if (uSock.Receive()){
MEDIUM_MSG("UDP API: %s", uSock.data);
JSON::Value Request = JSON::fromString(uSock.data, uSock.data_len);
MEDIUM_MSG("UDP API: %s", (const char*)uSock.data);
JSON::Value Request = JSON::fromString(uSock.data, uSock.data.size());
Request["minimal"] = true;
JSON::Value Response;
if (Request.isObject()){
@ -454,7 +454,7 @@ void Controller::handleUDPAPI(void *np){
Response.removeMember("authorize");
uSock.SendNow(Response.toString());
}else{
WARN_MSG("Invalid API command received over UDP: %s", uSock.data);
WARN_MSG("Invalid API command received over UDP: %s", (const char*)uSock.data);
}
}else{
Util::sleep(500);
@ -515,9 +515,9 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){
JSON::Value &out = Controller::Storage["config"];
if (in.isMember("debug")){
out["debug"] = in["debug"];
if (Util::Config::printDebugLevel != (out["debug"].isInt() ? out["debug"].asInt() : DEBUG)){
Util::Config::printDebugLevel = (out["debug"].isInt() ? out["debug"].asInt() : DEBUG);
INFO_MSG("Debug level set to %u", Util::Config::printDebugLevel);
if (Util::printDebugLevel != (out["debug"].isInt() ? out["debug"].asInt() : DEBUG)){
Util::printDebugLevel = (out["debug"].isInt() ? out["debug"].asInt() : DEBUG);
INFO_MSG("Debug level set to %u", Util::printDebugLevel);
}
}
if (in.isMember("protocols")){

View file

@ -116,9 +116,9 @@ namespace Controller{
argarr[argnum++] = (char *)(p[it.key()].c_str());
}else{
if (it.key() == "debug"){
if (Util::Config::printDebugLevel != DEBUG){
if (Util::printDebugLevel != DEBUG){
static std::string debugLvlStr;
debugLvlStr = JSON::Value(Util::Config::printDebugLevel).asString();
debugLvlStr = JSON::Value(Util::printDebugLevel).asString();
argarr[argnum++] = (char *)((*it)["option"].asStringRef().c_str());
argarr[argnum++] = (char *)debugLvlStr.c_str();
}

View file

@ -306,7 +306,7 @@ namespace Mist{
int Input::boot(int argc, char *argv[]){
if (!(config->parseArgs(argc, argv))){return 1;}
streamName = config->getString("streamname");
Util::Config::streamName = streamName;
Util::streamName = streamName;
if (config->getBool("json")){
capa["version"] = PACKAGE_VERSION;

View file

@ -373,8 +373,8 @@ namespace Mist{
// // wrong sending port, ignore packet
// continue;
//}
tcpCon.addDown(s.data_len);
RTP::Packet pack(s.data, s.data_len);
tcpCon.addDown(s.data.size());
RTP::Packet pack(s.data, s.data.size());
if (!it->second.theirSSRC){it->second.theirSSRC = pack.getSSRC();}
it->second.sorter.addPacket(pack);
}

View file

@ -512,7 +512,7 @@ namespace Mist{
std::string leftData;
bool received = false;
while (udpCon.Receive()){
downCounter += udpCon.data_len;
downCounter += udpCon.data.size();
received = true;
if (!gettingData){
gettingData = true;
@ -521,20 +521,20 @@ namespace Mist{
size_t offset = 0;
// Try to read full TS Packets
// Watch out! We push here to a global, in order for threads to be able to access it.
while (offset < udpCon.data_len){
while (offset < udpCon.data.size()){
if (udpCon.data[offset] == 0x47){// check for sync byte
if (offset + 188 <= udpCon.data_len){
if (offset + 188 <= udpCon.data.size()){
tsBuf.FromPointer(udpCon.data + offset);
liveStream.add(tsBuf);
if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());}
leftData.clear();
}else{
leftData.append(udpCon.data + offset, udpCon.data_len - offset);
leftData.append(udpCon.data + offset, udpCon.data.size() - offset);
}
offset += 188;
}else{
uint32_t maxBytes =
std::min((uint32_t)(188 - leftData.size()), (uint32_t)(udpCon.data_len - offset));
std::min((uint32_t)(188 - leftData.size()), (uint32_t)(udpCon.data.size() - offset));
uint32_t numBytes = maxBytes;
VERYHIGH_MSG("%" PRIu32 " bytes of non-sync-byte data received", numBytes);
if (leftData.size()){

View file

@ -80,7 +80,7 @@ namespace Mist{
// If we have a streamname option, set internal streamname to that option
if (!streamName.size() && config->hasOption("streamname")){
streamName = config->getString("streamname");
Util::Config::streamName = streamName;
Util::streamName = streamName;
}
/*LTS-START*/
@ -321,7 +321,7 @@ namespace Mist{
JSON::Value strCnf = Util::getStreamConfig(streamName);
if (strCnf && strCnf["fallback_stream"].asStringRef().size()){
streamName = strCnf["fallback_stream"].asStringRef();
Util::Config::streamName = streamName;
Util::streamName = streamName;
INFO_MSG("Switching to configured fallback stream '%s'", streamName.c_str());
reconnect();
return;
@ -352,7 +352,7 @@ namespace Mist{
newStrm.c_str());
std::string origStream = streamName;
streamName = newStrm;
Util::Config::streamName = streamName;
Util::streamName = streamName;
if (!Util::startInput(streamName, "", true, isPushing())){
onFail("Stream open failed (fallback stream for '" + origStream + "')", true);
return;

View file

@ -223,7 +223,7 @@ namespace Mist{
void OutDTSC::handlePlay(DTSC::Scan &dScan){
streamName = dScan.getMember("stream").asString();
Util::sanitizeName(streamName);
Util::Config::streamName = streamName;
Util::streamName = streamName;
parseData = true;
INFO_MSG("Handled play for stream %s", streamName.c_str());
setBlocking(false);
@ -233,7 +233,7 @@ namespace Mist{
streamName = dScan.getMember("stream").asString();
std::string passString = dScan.getMember("password").asString();
Util::sanitizeName(streamName);
Util::Config::streamName = streamName;
Util::streamName = streamName;
if (!allowPush(passString)){
onFail("Push not allowed - stream and/or password incorrect", true);
return;

View file

@ -397,7 +397,7 @@ namespace Mist{
int argnum = 0;
argarr[argnum++] = (char *)tmparg.c_str();
std::string temphost = getConnectedHost();
std::string debuglevel = JSON::Value(Util::Config::printDebugLevel).asString();
std::string debuglevel = JSON::Value(Util::printDebugLevel).asString();
argarr[argnum++] = (char *)"--ip";
argarr[argnum++] = (char *)(temphost.c_str());
argarr[argnum++] = (char *)"--stream";
@ -405,7 +405,7 @@ namespace Mist{
argarr[argnum++] = (char *)"--prequest";
argarr[argnum++] = (char *)(tmpPrequest.c_str());
// set the debug level if non-default
if (Util::Config::printDebugLevel != DEBUG){
if (Util::printDebugLevel != DEBUG){
argarr[argnum++] = (char *)"--debug";
argarr[argnum++] = (char *)(debuglevel.c_str());
}

View file

@ -441,7 +441,7 @@ namespace Mist{
INFO_MSG("Falling back to default stream '%s' -> '%s'", defStrm.c_str(), newStrm.c_str());
origStreamName = streamName;
streamName = newStrm;
Util::Config::streamName = streamName;
Util::streamName = streamName;
reconnect();
return getStatusJSON(reqHost, useragent);
}

View file

@ -216,7 +216,7 @@ namespace Mist{
char ffcmd[256];
ffcmd[255] = 0; // ensure there is an ending null byte
snprintf(ffcmd, 255, "ffmpeg %s -f h264 -i - %s -vframes 1 -f mjpeg -",
(Util::Config::printDebugLevel >= DLVL_MEDIUM ? "" : "-v quiet"),
(Util::printDebugLevel >= DLVL_MEDIUM ? "" : "-v quiet"),
config->getString("ffopts").c_str());
HIGH_MSG("Starting JPG command: %s", ffcmd);

View file

@ -828,11 +828,11 @@ namespace Mist{
if (streamName.find('?') != std::string::npos){
std::string tmpVars = streamName.substr(streamName.find('?') + 1);
streamName = streamName.substr(0, streamName.find('?'));
Util::Config::streamName = streamName;
Util::streamName = streamName;
HTTP::parseVars(tmpVars, targetParams);
}
Util::Config::streamName = streamName;
Util::streamName = streamName;
reqUrl += "/" + streamName; // LTS
/*LTS-START*/
@ -850,17 +850,17 @@ namespace Mist{
size_t lSlash = newUrl.rfind('/');
if (lSlash != std::string::npos){
streamName = newUrl.substr(lSlash + 1);
Util::Config::streamName = streamName;
Util::streamName = streamName;
}else{
streamName = newUrl;
Util::Config::streamName = streamName;
Util::streamName = streamName;
}
}
/*LTS-END*/
if (streamName.find('/')){
streamName = streamName.substr(0, streamName.find('/'));
Util::Config::streamName = streamName;
Util::streamName = streamName;
}
size_t colonPos = streamName.find(':');
@ -871,7 +871,7 @@ namespace Mist{
}else{
streamName = oldName.substr(colonPos + 1) + std::string(".") + oldName.substr(0, colonPos);
}
Util::Config::streamName = streamName;
Util::streamName = streamName;
}
Util::sanitizeName(streamName);
@ -922,14 +922,14 @@ namespace Mist{
int8_t playMessageType = messageType;
int32_t playStreamId = streamId;
streamName = Encodings::URL::decode(amfData.getContentP(3)->StrValue());
Util::Config::streamName = streamName;
Util::streamName = streamName;
reqUrl += "/" + streamName; // LTS
// handle variables
if (streamName.find('?') != std::string::npos){
std::string tmpVars = streamName.substr(streamName.find('?') + 1);
streamName = streamName.substr(0, streamName.find('?'));
Util::Config::streamName = streamName;
Util::streamName = streamName;
HTTP::parseVars(tmpVars, targetParams);
}
@ -941,7 +941,7 @@ namespace Mist{
}else{
streamName = oldName.substr(colonPos + 1) + std::string(".") + oldName.substr(0, colonPos);
}
Util::Config::streamName = streamName;
Util::streamName = streamName;
}
Util::sanitizeName(streamName);

View file

@ -485,8 +485,8 @@ namespace Mist{
continue;
}
lastRecv = Util::bootSecs(); // prevent disconnect of idle TCP connection when using UDP
myConn.addDown(s.data_len);
RTP::Packet pack(s.data, s.data_len);
myConn.addDown(s.data.size());
RTP::Packet pack(s.data, s.data.size());
if (!it->second.theirSSRC){it->second.theirSSRC = pack.getSSRC();}
it->second.sorter.addPacket(pack);
}

View file

@ -910,7 +910,7 @@ namespace Mist{
bool hadPack = false;
while (udp.Receive()){
hadPack = true;
myConn.addDown(udp.data_len);
myConn.addDown(udp.data.size());
uint8_t fb = (uint8_t)udp.data[0];
@ -936,7 +936,7 @@ namespace Mist{
size_t nparsed = 0;
StunMessage stun_msg;
if (stunReader.parse((uint8_t *)udp.data, udp.data_len, nparsed, stun_msg) != 0){
if (stunReader.parse((uint8_t *)(char*)udp.data, udp.data.size(), nparsed, stun_msg) != 0){
FAIL_MSG("Failed to parse a stun message.");
return;
}
@ -1006,7 +1006,7 @@ namespace Mist{
return;
}
if (dtlsHandshake.parse((const uint8_t *)udp.data, udp.data_len) != 0){
if (dtlsHandshake.parse((const uint8_t *)(const char*)udp.data, udp.data.size()) != 0){
FAIL_MSG("Failed to parse a DTLS packet.");
return;
}
@ -1049,7 +1049,7 @@ namespace Mist{
if ((pt < 64) || (pt >= 96)){
RTP::Packet rtp_pkt((const char *)udp.data, (unsigned int)udp.data_len);
RTP::Packet rtp_pkt((const char *)udp.data, (unsigned int)udp.data.size());
uint16_t currSeqNum = rtp_pkt.getSequence();
size_t idx = M.trackIDToIndex(rtp_pkt.getPayloadType(), getpid());
@ -1071,8 +1071,8 @@ namespace Mist{
WebRTCTrack &rtcTrack = webrtcTracks[idx];
// Decrypt the SRTP to RTP
int len = (int)udp.data_len;
if (srtpReader.unprotectRtp((uint8_t *)udp.data, &len) != 0){
int len = udp.data.size();
if (srtpReader.unprotectRtp((uint8_t *)(char*)udp.data, &len) != 0){
if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "RTP decrypt failure" << std::endl;}
FAIL_MSG("Failed to unprotect a RTP packet.");
return;
@ -1102,8 +1102,8 @@ namespace Mist{
}else{
//Decrypt feedback packet
int len = udp.data_len;
if (srtpReader.unprotectRtcp((uint8_t *)udp.data, &len) != 0){
int len = udp.data.size();
if (srtpReader.unprotectRtcp((uint8_t *)(char*)udp.data, &len) != 0){
if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "RTCP decrypt failure" << std::endl;}
FAIL_MSG("Failed to unprotect RTCP.");
return;

View file

@ -46,7 +46,7 @@ namespace Mist{
streamName = opt["sink"].asString();
if (!streamName.size()){streamName = opt["source"].asString();}
Util::streamVariables(streamName, opt["source"].asString());
Util::Config::streamName = opt["source"].asString() + "" + streamName;
Util::streamName = opt["source"].asString() + "" + streamName;
}
bool needsLock(){return false;}
bool isSingular(){return false;}

View file

@ -389,7 +389,7 @@ namespace Mist{
streamName = opt["sink"].asString();
if (!streamName.size()){streamName = opt["source"].asString();}
Util::streamVariables(streamName, opt["source"].asString());
Util::Config::streamName = opt["source"].asString() + "" + streamName;
Util::streamName = opt["source"].asString() + "" + streamName;
}
std::string EncodeOutputEBML::getTrackType(int tid){return M.getType(tid);}

View file

@ -227,7 +227,7 @@ namespace Mist{
streamName = opt["sink"].asString();
if (!streamName.size()){streamName = opt["source"].asString();}
Util::streamVariables(streamName, opt["source"].asString());
Util::Config::streamName = opt["source"].asString() + "" + streamName;
Util::streamName = opt["source"].asString() + "" + streamName;
preRun();
};
virtual bool needsLock(){return false;}