diff --git a/lib/auth.cpp b/lib/auth.cpp index 468cd381..276fc5db 100644 --- a/lib/auth.cpp +++ b/lib/auth.cpp @@ -213,7 +213,7 @@ namespace Secure{ } /// Calculates a SHA256 digest as per NSAs SHA-2, returning it as binary. - /// Assumes output is big enough to contain 16 bytes of data. + /// Assumes output is big enough to contain 32 bytes of data. void sha256bin(const char *input, const unsigned int in_len, char *output){ // Initialize the hash, according to MD5 spec. uint32_t hash[] ={0x6a09e667, 0xbb67ae85, 0x3c6ef372, 0xa54ff53a, diff --git a/lib/config.cpp b/lib/config.cpp index 361e33c3..74c4ccc4 100644 --- a/lib/config.cpp +++ b/lib/config.cpp @@ -36,7 +36,7 @@ bool Util::Config::is_active = false; bool Util::Config::is_restarting = false; -static Socket::Server *serv_sock_pointer = 0; +static int serv_sock_fd = -1; uint32_t Util::printDebugLevel = DEBUG; __thread char Util::streamName[256] = {0}; __thread char Util::exitReason[256] = {0}; @@ -521,7 +521,7 @@ int Util::Config::serveThreadedSocket(int (*callback)(Socket::Connection &)){ return 1; } Socket::getSocketName(server_socket.getSocket(), Util::listenInterface, Util::listenPort); - serv_sock_pointer = &server_socket; + serv_sock_fd = server_socket.getSocket(); activate(); if (server_socket.getSocket()){ int oldSock = server_socket.getSocket(); @@ -531,7 +531,7 @@ int Util::Config::serveThreadedSocket(int (*callback)(Socket::Connection &)){ } } int r = threadServer(server_socket, callback); - serv_sock_pointer = 0; + serv_sock_fd = -1; return r; } @@ -549,7 +549,7 @@ int Util::Config::serveForkedSocket(int (*callback)(Socket::Connection &S)){ return 1; } Socket::getSocketName(server_socket.getSocket(), Util::listenInterface, Util::listenPort); - serv_sock_pointer = &server_socket; + serv_sock_fd = server_socket.getSocket(); activate(); if (server_socket.getSocket()){ int oldSock = server_socket.getSocket(); @@ -559,7 +559,7 @@ int Util::Config::serveForkedSocket(int (*callback)(Socket::Connection &S)){ } } int r = forkServer(server_socket, callback); - serv_sock_pointer = 0; + serv_sock_fd = -1; return r; } @@ -601,6 +601,10 @@ void Util::Config::setMutexAborter(void * mutex){ mutabort = (tthread::mutex*)mutex; } +void Util::Config::setServerFD(int fd){ + serv_sock_fd = fd; +} + /// Basic signal handler. Sets is_active to false if it receives /// a SIGINT, SIGHUP or SIGTERM signal, reaps children for the SIGCHLD /// signal, and ignores all other signals. @@ -610,7 +614,7 @@ void Util::Config::signal_handler(int signum, siginfo_t *sigInfo, void *ignore){ case SIGHUP: case SIGTERM: if (!mutabort || mutabort->try_lock()){ - if (serv_sock_pointer){serv_sock_pointer->close();} + if (serv_sock_fd != -1){close(serv_sock_fd);} if (mutabort){mutabort->unlock();} } #if DEBUG >= DLVL_DEVEL diff --git a/lib/config.h b/lib/config.h index a22967b6..f23b598d 100644 --- a/lib/config.h +++ b/lib/config.h @@ -38,6 +38,7 @@ namespace Util{ public: static void setMutexAborter(void * mutex); static void wipeShm(); + static void setServerFD(int fd); // variables static bool is_active; ///< Set to true by activate(), set to false by the signal handler. static bool is_restarting; ///< Set to true when restarting, set to false on boot. diff --git a/lib/socket.cpp b/lib/socket.cpp index f1df95d7..3e2104c8 100644 --- a/lib/socket.cpp +++ b/lib/socket.cpp @@ -44,6 +44,14 @@ static const char *gai_strmagic(int errcode){ } } +std::string Socket::sockaddrToString(const sockaddr* A){ + char addressBuffer[INET6_ADDRSTRLEN]; + if (inet_ntop(AF_INET, A, addressBuffer, INET6_ADDRSTRLEN)){ + return addressBuffer; + } + return ""; +} + static std::string getIPv6BinAddr(const struct sockaddr_in6 &remoteaddr){ char tmpBuffer[17] = "\000\000\000\000\000\000\000\000\000\000\377\377\000\000\000\000"; switch (remoteaddr.sin6_family){ @@ -130,6 +138,44 @@ bool Socket::matchIPv6Addr(const std::string &A, const std::string &B, uint8_t p return true; } +bool Socket::compareAddress(const sockaddr* A, const sockaddr* B){ + if (!A || !B){return false;} + bool aSix = false, bSix = false; + char *aPtr = 0, *bPtr = 0; + uint16_t aPort = 0, bPort = 0; + if (A->sa_family == AF_INET){ + aPtr = (char*)&((sockaddr_in*)A)->sin_addr; + aPort = ((sockaddr_in*)A)->sin_port; + }else if(A->sa_family == AF_INET6){ + aPtr = (char*)&((sockaddr_in6*)A)->sin6_addr; + aPort = ((sockaddr_in6*)A)->sin6_port; + if (!memcmp("\000\000\000\000\000\000\000\000\000\000\377\377", aPtr, 12)){ + aPtr += 12; + }else{ + aSix = true; + } + }else{ + return false; + } + if (B->sa_family == AF_INET){ + bPtr = (char*)&((sockaddr_in*)B)->sin_addr; + bPort = ((sockaddr_in*)B)->sin_port; + }else if(B->sa_family == AF_INET6){ + bPtr = (char*)&((sockaddr_in6*)B)->sin6_addr; + bPort = ((sockaddr_in6*)B)->sin6_port; + if (!memcmp("\000\000\000\000\000\000\000\000\000\000\377\377", bPtr, 12)){ + bPtr += 12; + }else{ + bSix = true; + } + }else{ + return false; + } + if (aPort != bPort){return false;} + if (aSix != bSix){return false;} + return !memcmp(aPtr, bPtr, aSix?16:4); +} + /// Attempts to match the given address with optional subnet to the given binary-form IPv6 address. /// Returns true if match could be made, false otherwise. bool Socket::isBinAddress(const std::string &binAddr, std::string addr){ @@ -180,7 +226,7 @@ std::string Socket::getBinForms(std::string addr){ memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_ADDRCONFIG | AI_V4MAPPED; + hints.ai_flags = AI_ADDRCONFIG | AI_V4MAPPED | AI_ALL; hints.ai_protocol = 0; hints.ai_canonname = NULL; hints.ai_addr = NULL; @@ -200,6 +246,45 @@ std::string Socket::getBinForms(std::string addr){ return ret; } +std::deque Socket::getAddrs(std::string addr, uint16_t port, int family){ + std::deque ret; + struct addrinfo *result, *rp, hints; + if (addr.substr(0, 7) == "::ffff:"){addr = addr.substr(7);} + std::stringstream ss; + ss << port; + + memset(&hints, 0, sizeof(struct addrinfo)); + // For unspecified, we do IPv6, then do IPv4 separately after + hints.ai_family = family==AF_UNSPEC?AF_INET6:family; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_flags = AI_ADDRCONFIG | AI_PASSIVE | AI_V4MAPPED | AI_ALL; + hints.ai_protocol = IPPROTO_UDP; + int s = getaddrinfo(addr.c_str(), ss.str().c_str(), &hints, &result); + if (!s){ + // Store each address in a string and put it in the deque. + for (rp = result; rp != NULL; rp = rp->ai_next){ + ret.push_back(std::string((char*)rp->ai_addr, rp->ai_addrlen)); + } + freeaddrinfo(result); + } + + // If failed or unspecified, (also) try IPv4 + if (s || family==AF_UNSPEC){ + hints.ai_family = AF_INET; + s = getaddrinfo(addr.c_str(), ss.str().c_str(), &hints, &result); + if (!s){ + // Store each address in a string and put it in the deque. + for (rp = result; rp != NULL; rp = rp->ai_next){ + ret.push_back(std::string((char*)rp->ai_addr, rp->ai_addrlen)); + } + freeaddrinfo(result); + } + } + + // Return all we found + return ret; +} + /// Checks bytes (length len) containing a binary-encoded IPv4 or IPv6 IP address, and writes it in /// human-readable notation to target. Writes "unknown" if it cannot decode to a sensible value. void Socket::hostBytesToStr(const char *bytes, size_t len, std::string &target){ @@ -1679,6 +1764,7 @@ void Socket::UDPConnection::init(bool _nonblock, int _family){ isConnected = false; wasEncrypted = false; pretendReceive = false; + ignoreSendErrors = false; sock = socket(family, SOCK_DGRAM, 0); if (sock == -1 && family == AF_INET6){ sock = socket(AF_INET, SOCK_DGRAM, 0); @@ -1700,10 +1786,6 @@ void Socket::UDPConnection::init(bool _nonblock, int _family){ up = 0; down = 0; - destAddr = 0; - destAddr_size = 0; - recvAddr = 0; - recvAddr_size = 0; hasReceiveData = false; #ifdef __CYGWIN__ data.allocate(SOCKETSIZE); @@ -1712,6 +1794,23 @@ void Socket::UDPConnection::init(bool _nonblock, int _family){ #endif } +void Socket::UDPConnection::assimilate(int _sock){ + if (sock != -1){close();} + sock = _sock; + { // Extract socket family + struct sockaddr_storage fin_addr; + socklen_t alen = sizeof(fin_addr); + if (getsockname(sock, (struct sockaddr *)&fin_addr, &alen) == 0){ + family = fin_addr.ss_family; + if (family == AF_INET6){ + boundPort = ntohs(((struct sockaddr_in6 *)&fin_addr)->sin6_port); + }else{ + boundPort = ntohs(((struct sockaddr_in *)&fin_addr)->sin_port); + } + } + } +} + #if HAVE_UPSTREAM_MBEDTLS_SRTP #if MBEDTLS_VERSION_MAJOR > 2 static void dtlsExtractKeyData( void *user, mbedtls_ssl_key_export_type type, const unsigned char *ms, size_t, const unsigned char client_random[32], const unsigned char server_random[32], mbedtls_tls_prf_types tls_prf_type){ @@ -1927,18 +2026,10 @@ void Socket::UDPConnection::checkRecvBuf(){ Socket::UDPConnection::UDPConnection(const UDPConnection &o){ init(!o.isBlocking, o.family); INFO_MSG("Copied socket of type %s", addrFam(o.family)); - if (o.destAddr && o.destAddr_size){ - destAddr = malloc(o.destAddr_size); - destAddr_size = o.destAddr_size; - if (destAddr){memcpy(destAddr, o.destAddr, o.destAddr_size);} - } - if (o.recvAddr && o.recvAddr_size){ - recvAddr = malloc(o.recvAddr_size); - recvAddr_size = o.recvAddr_size; - if (recvAddr){memcpy(recvAddr, o.recvAddr, o.recvAddr_size);} - } + if (o.destAddr.size()){destAddr = o.destAddr;} + if (o.recvAddr.size()){recvAddr = o.recvAddr;} if (o.data.size()){ - data.assign(o.data, o.data.size()); + data = o.data; pretendReceive = true; } hasReceiveData = o.hasReceiveData; @@ -1956,14 +2047,6 @@ void Socket::UDPConnection::close(){ /// Closes the UDP socket, cleans up any memory allocated by the socket. Socket::UDPConnection::~UDPConnection(){ close(); - if (destAddr){ - free(destAddr); - destAddr = 0; - } - if (recvAddr){ - free(recvAddr); - recvAddr = 0; - } #ifdef SSL deinitDTLS(); #endif @@ -1975,10 +2058,10 @@ bool Socket::UDPConnection::operator==(const Socket::UDPConnection& b) const{ if (sock == b.sock){return true;} // If either is closed (and the other is not), not equal. if (sock == -1 || b.sock == -1){return false;} - size_t recvSize = recvAddr_size; - if (b.recvAddr_size < recvSize){recvSize = b.recvAddr_size;} - size_t destSize = destAddr_size; - if (b.destAddr_size < destSize){destSize = b.destAddr_size;} + size_t recvSize = recvAddr.size(); + if (b.recvAddr.size() < recvSize){recvSize = b.recvAddr.size();} + size_t destSize = destAddr.size(); + if (b.destAddr.size() < destSize){destSize = b.destAddr.size();} // They are equal if they hold the same local and remote address. if (recvSize && destSize && destAddr && b.destAddr && recvAddr && b.recvAddr){ if (!memcmp(recvAddr, b.recvAddr, recvSize) && !memcmp(destAddr, b.destAddr, destSize)){ @@ -1999,128 +2082,108 @@ void Socket::UDPConnection::setSocketFamily(int AF_TYPE){\ /// Allocates enough space for the largest type of address we support, so that receive calls can write to it. void Socket::UDPConnection::allocateDestination(){ - if (destAddr && destAddr_size < sizeof(sockaddr_in6)){ - free(destAddr); - destAddr = 0; + if (!destAddr.size()){ + destAddr.truncate(0); + destAddr.allocate(sizeof(sockaddr_in6)); + memset(destAddr, 0, sizeof(sockaddr_in6)); + ((struct sockaddr *)(char*)destAddr)->sa_family = AF_UNSPEC; + destAddr.append(0, sizeof(sockaddr_in6)); } - if (!destAddr){ - destAddr = malloc(sizeof(sockaddr_in6)); - if (destAddr){ - destAddr_size = sizeof(sockaddr_in6); - memset(destAddr, 0, sizeof(sockaddr_in6)); - ((struct sockaddr_in *)destAddr)->sin_family = AF_UNSPEC; - } - } - if (recvAddr && recvAddr_size < sizeof(sockaddr_in6)){ - free(recvAddr); - recvAddr = 0; - } - if (!recvAddr){ - recvAddr = malloc(sizeof(sockaddr_in6)); - if (recvAddr){ - recvAddr_size = sizeof(sockaddr_in6); - memset(recvAddr, 0, sizeof(sockaddr_in6)); - ((struct sockaddr_in *)recvAddr)->sin_family = AF_UNSPEC; - } - const int opt = 1; - if (setsockopt(sock, IPPROTO_IP, IP_PKTINFO, &opt, sizeof(opt))){ - WARN_MSG("Could not set PKTINFO to 1!"); + if (!recvAddr.size()){ + recvAddr.truncate(0); + recvAddr.allocate(sizeof(sockaddr_in6)); + memset(recvAddr, 0, sizeof(sockaddr_in6)); + ((struct sockaddr *)(char*)recvAddr)->sa_family = AF_UNSPEC; + recvAddr.append(0, sizeof(sockaddr_in6)); + } +#ifdef HASPKTINFO + const int opt = 1; + if (setsockopt(sock, IPPROTO_IP, IP_PKTINFO, &opt, sizeof(opt))){ + WARN_MSG("Could not set IPv4 packet info receiving enabled!"); + } + if (family == AF_INET6){ + if (setsockopt(sock, IPPROTO_IPV6, IPV6_RECVPKTINFO, &opt, sizeof(opt))){ + WARN_MSG("Could not set IPv6 packet info receiving enabled!"); } } +#endif } /// Stores the properties of the receiving end of this UDP socket. /// This will be the receiving end for all SendNow calls. void Socket::UDPConnection::SetDestination(std::string destIp, uint32_t port){ DONTEVEN_MSG("Setting destination to %s:%u", destIp.c_str(), port); - // UDP sockets can switch between IPv4 and IPv6 on demand. - // We change IPv4-mapped IPv6 addresses into IPv4 addresses for Windows-sillyness reasons. - if (destIp.substr(0, 7) == "::ffff:"){destIp = destIp.substr(7);} - struct addrinfo *result, *rp, hints; - std::stringstream ss; - ss << port; - memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = family; - hints.ai_socktype = SOCK_DGRAM; - hints.ai_flags = AI_ADDRCONFIG | AI_ALL; - hints.ai_protocol = IPPROTO_UDP; - hints.ai_canonname = NULL; - hints.ai_addr = NULL; - hints.ai_next = NULL; - int s = getaddrinfo(destIp.c_str(), ss.str().c_str(), &hints, &result); - if (s != 0){ - hints.ai_family = AF_UNSPEC; - s = getaddrinfo(destIp.c_str(), ss.str().c_str(), &hints, &result); - if (s != 0){ - FAIL_MSG("Could not connect UDP socket to %s:%i! Error: %s", destIp.c_str(), port, gai_strmagic(s)); - return; - } + std::deque addrs = getAddrs(destIp, port, family); + for (std::deque::iterator it = addrs.begin(); it != addrs.end(); ++it){ + if (setDestination((sockaddr*)it->data(), it->size())){return;} } - - for (rp = result; rp != NULL; rp = rp->ai_next){ - // assume success - if (destAddr){ - free(destAddr); - destAddr = 0; - } - destAddr_size = rp->ai_addrlen; - destAddr = malloc(destAddr_size); - if (!destAddr){return;} - memcpy(destAddr, rp->ai_addr, rp->ai_addrlen); - if (family != rp->ai_family){ - INFO_MSG("Switching UDP socket from %s to %s", addrFam(family), addrFam(rp->ai_family)); - close(); - family = rp->ai_family; - sock = socket(family, SOCK_DGRAM, 0); - { - // Allow address re-use - int on = 1; - setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); - } - checkRecvBuf(); - if (boundPort){ - INFO_MSG("Rebinding to %s:%d %s", boundAddr.c_str(), boundPort, boundMulti.c_str()); - bind(boundPort, boundAddr, boundMulti); - } - } - { - std::string trueDest; - uint32_t truePort; - GetDestination(trueDest, truePort); - HIGH_MSG("Set UDP destination: %s:%d => %s:%d (%s)", destIp.c_str(), port, trueDest.c_str(), truePort, addrFam(family)); - } - freeaddrinfo(result); - return; - //\todo Possibly detect and handle failure - } - freeaddrinfo(result); - free(destAddr); - destAddr = 0; + destAddr.truncate(0); + allocateDestination(); FAIL_MSG("Could not set destination for UDP socket: %s:%d", destIp.c_str(), port); }// Socket::UDPConnection SetDestination +bool Socket::UDPConnection::setDestination(sockaddr * addr, size_t size){ + // UDP sockets can on-the-fly switch between IPv4/IPv6 if necessary + if (family != addr->sa_family){ + if (ignoreSendErrors){return false;} + WARN_MSG("Switching UDP socket from %s to %s", addrFam(family), addrFam(((sockaddr*)(char*)destAddr)->sa_family)); + close(); + family = addr->sa_family; + sock = socket(family, SOCK_DGRAM, 0); + { + // Allow address re-use + int on = 1; + setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + } + if (family == AF_INET6){ + const int optval = 0; + if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &optval, sizeof(optval)) < 0){ + WARN_MSG("Could not set IPv6 UDP socket to be dual-stack! %s", strerror(errno)); + } + } + checkRecvBuf(); + if (boundPort){ + INFO_MSG("Rebinding to %s:%d %s", boundAddr.c_str(), boundPort, boundMulti.c_str()); + bind(boundPort, boundAddr, boundMulti); + } + } + hasReceiveData = false; + destAddr.assign(addr, size); + { + std::string trueDest; + uint32_t truePort; + GetDestination(trueDest, truePort); + HIGH_MSG("Set UDP destination to %s:%d (%s)", trueDest.c_str(), truePort, addrFam(family)); + } + return true; +} + +const Util::ResizeablePointer & Socket::UDPConnection::getRemoteAddr() const{ + return destAddr; +} + /// Gets the properties of the receiving end of this UDP socket. /// This will be the receiving end for all SendNow calls. void Socket::UDPConnection::GetDestination(std::string &destIp, uint32_t &port){ - if (!destAddr || !destAddr_size){ + if (!destAddr.size()){ destIp = ""; port = 0; return; } char addr_str[INET6_ADDRSTRLEN + 1]; addr_str[INET6_ADDRSTRLEN] = 0; // set last byte to zero, to prevent walking out of the array - if (((struct sockaddr_in *)destAddr)->sin_family == AF_INET6){ - if (inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)destAddr)->sin6_addr), addr_str, INET6_ADDRSTRLEN) != 0){ + if (((struct sockaddr *)(char*)destAddr)->sa_family == AF_INET6){ + if (inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)(char*)destAddr)->sin6_addr), addr_str, INET6_ADDRSTRLEN) != 0){ destIp = addr_str; - port = ntohs(((struct sockaddr_in6 *)destAddr)->sin6_port); + port = ntohs(((struct sockaddr_in6 *)(char*)destAddr)->sin6_port); return; } } - if (((struct sockaddr_in *)destAddr)->sin_family == AF_INET){ - if (inet_ntop(AF_INET, &(((struct sockaddr_in *)destAddr)->sin_addr), addr_str, INET6_ADDRSTRLEN) != 0){ + if (((struct sockaddr_in *)(char*)destAddr)->sin_family == AF_INET){ + if (inet_ntop(AF_INET, &(((struct sockaddr_in *)(char*)destAddr)->sin_addr), addr_str, INET6_ADDRSTRLEN) != 0){ destIp = addr_str; - port = ntohs(((struct sockaddr_in *)destAddr)->sin_port); + port = ntohs(((struct sockaddr_in *)(char*)destAddr)->sin_port); return; } } @@ -2132,24 +2195,24 @@ void Socket::UDPConnection::GetDestination(std::string &destIp, uint32_t &port){ /// Gets the properties of the receiving end of the local UDP socket. /// This will be the sending end for all SendNow calls. void Socket::UDPConnection::GetLocalDestination(std::string &destIp, uint32_t &port){ - if (!recvAddr || !recvAddr_size){ + if (!recvAddr.size()){ destIp = ""; port = 0; return; } char addr_str[INET6_ADDRSTRLEN + 1]; addr_str[INET6_ADDRSTRLEN] = 0; // set last byte to zero, to prevent walking out of the array - if (((struct sockaddr_in *)recvAddr)->sin_family == AF_INET6){ - if (inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)recvAddr)->sin6_addr), addr_str, INET6_ADDRSTRLEN) != 0){ + if (((struct sockaddr *)(char*)recvAddr)->sa_family == AF_INET6){ + if (inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)(char*)recvAddr)->sin6_addr), addr_str, INET6_ADDRSTRLEN) != 0){ destIp = addr_str; - port = ntohs(((struct sockaddr_in6 *)recvAddr)->sin6_port); + port = ntohs(((struct sockaddr_in6 *)(char*)recvAddr)->sin6_port); return; } } - if (((struct sockaddr_in *)recvAddr)->sin_family == AF_INET){ - if (inet_ntop(AF_INET, &(((struct sockaddr_in *)recvAddr)->sin_addr), addr_str, INET6_ADDRSTRLEN) != 0){ + if (((struct sockaddr *)(char*)recvAddr)->sa_family == AF_INET){ + if (inet_ntop(AF_INET, &(((struct sockaddr_in *)(char*)recvAddr)->sin_addr), addr_str, INET6_ADDRSTRLEN) != 0){ destIp = addr_str; - port = ntohs(((struct sockaddr_in *)recvAddr)->sin_port); + port = ntohs(((struct sockaddr_in *)(char*)recvAddr)->sin_port); return; } } @@ -2162,7 +2225,7 @@ void Socket::UDPConnection::GetLocalDestination(std::string &destIp, uint32_t &p /// This will be the receiving end for all SendNow calls. std::string Socket::UDPConnection::getBinDestination(){ std::string binList; - if (destAddr && destAddr_size){binList = getIPv6BinAddr(*(sockaddr_in6*)destAddr);} + if (destAddr.size()){binList = getIPv6BinAddr(*(sockaddr_in6*)(char*)destAddr);} if (binList.size() < 16){return std::string("\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000", 16);} return binList.substr(0, 16); }// Socket::UDPConnection GetDestination @@ -2170,12 +2233,12 @@ std::string Socket::UDPConnection::getBinDestination(){ /// Returns the port number of the receiving end of this socket. /// Returns 0 on error. uint32_t Socket::UDPConnection::getDestPort() const{ - if (!destAddr || !destAddr_size){return 0;} - if (((struct sockaddr_in *)destAddr)->sin_family == AF_INET6){ - return ntohs(((struct sockaddr_in6 *)destAddr)->sin6_port); + if (!destAddr.size()){return 0;} + if (((const struct sockaddr *)(const char*)destAddr)->sa_family == AF_INET6){ + return ntohs(((const struct sockaddr_in6 *)(const char*)destAddr)->sin6_port); } - if (((struct sockaddr_in *)destAddr)->sin_family == AF_INET){ - return ntohs(((struct sockaddr_in *)destAddr)->sin_port); + if (((const struct sockaddr *)(const char*)destAddr)->sa_family == AF_INET){ + return ntohs(((const struct sockaddr_in *)(const char*)destAddr)->sin_port); } return 0; } @@ -2189,6 +2252,10 @@ void Socket::UDPConnection::setBlocking(bool blocking){ } } +void Socket::UDPConnection::setIgnoreSendErrors(bool ign){ + ignoreSendErrors = ign; +} + /// Sends a UDP datagram using the buffer sdata. /// This function simply calls SendNow(const char*, size_t) void Socket::UDPConnection::SendNow(const std::string &sdata){ @@ -2207,7 +2274,7 @@ void Socket::UDPConnection::SendNow(const char *sdata){ /// Does not do anything if len < 1. /// Prints an DLVL_FAIL level debug message if sending failed. void Socket::UDPConnection::SendNow(const char *sdata, size_t len){ - SendNow(sdata, len, (sockaddr*)destAddr, destAddr_size); + SendNow(sdata, len, (sockaddr*)(char*)destAddr, destAddr.size()); } /// Sends a UDP datagram using the buffer sdata of length len. @@ -2220,6 +2287,7 @@ void Socket::UDPConnection::SendNow(const char *sdata, size_t len, sockaddr * dA if (r > 0){ up += r; }else{ + if (ignoreSendErrors){return;} if (errno == EDESTADDRREQ){ close(); return; @@ -2228,8 +2296,8 @@ void Socket::UDPConnection::SendNow(const char *sdata, size_t len, sockaddr * dA } return; } -#if !defined(__CYGWIN__) && !defined(_WIN32) - if (hasReceiveData && recvAddr){ +#ifdef HASPKTINFO + if (hasReceiveData && recvAddr.size()){ msghdr mHdr; char msg_control[0x100]; iovec iovec; @@ -2244,22 +2312,34 @@ void Socket::UDPConnection::SendNow(const char *sdata, size_t len, sockaddr * dA mHdr.msg_flags = 0; int cmsg_space = 0; cmsghdr * cmsg = CMSG_FIRSTHDR(&mHdr); - cmsg->cmsg_level = IPPROTO_IP; - cmsg->cmsg_type = IP_PKTINFO; + if (family == AF_INET){ + cmsg->cmsg_level = IPPROTO_IP; + cmsg->cmsg_type = IP_PKTINFO; - struct in_pktinfo in_pktinfo; - memcpy(&(in_pktinfo.ipi_spec_dst), &(((sockaddr_in*)recvAddr)->sin_family), sizeof(in_pktinfo.ipi_spec_dst)); - in_pktinfo.ipi_ifindex = recvInterface; - cmsg->cmsg_len = CMSG_LEN(sizeof(in_pktinfo)); - *(struct in_pktinfo*)CMSG_DATA(cmsg) = in_pktinfo; - cmsg_space += CMSG_SPACE(sizeof(in_pktinfo)); + struct in_pktinfo in_pktinfo; + memcpy(&(in_pktinfo.ipi_spec_dst), &(((sockaddr_in*)(char*)recvAddr)->sin_family), sizeof(in_pktinfo.ipi_spec_dst)); + in_pktinfo.ipi_ifindex = recvInterface; + cmsg->cmsg_len = CMSG_LEN(sizeof(in_pktinfo)); + *(struct in_pktinfo*)CMSG_DATA(cmsg) = in_pktinfo; + cmsg_space += CMSG_SPACE(sizeof(in_pktinfo)); + }else if (family == AF_INET6){ + cmsg->cmsg_level = IPPROTO_IPV6; + cmsg->cmsg_type = IPV6_PKTINFO; + + struct in6_pktinfo in6_pktinfo; + memcpy(&(in6_pktinfo.ipi6_addr), &(((sockaddr_in6*)(char*)recvAddr)->sin6_addr), sizeof(in6_pktinfo.ipi6_addr)); + in6_pktinfo.ipi6_ifindex = recvInterface; + cmsg->cmsg_len = CMSG_LEN(sizeof(in6_pktinfo)); + *(struct in6_pktinfo*)CMSG_DATA(cmsg) = in6_pktinfo; + cmsg_space += CMSG_SPACE(sizeof(in6_pktinfo)); + } mHdr.msg_controllen = cmsg_space; int r = sendmsg(sock, &mHdr, 0); if (r > 0){ up += r; }else{ - if (errno != ENETUNREACH){ + if (errno != ENETUNREACH && !ignoreSendErrors){ FAIL_MSG("Could not send UDP data through %d: %s", sock, strerror(errno)); } } @@ -2270,11 +2350,11 @@ void Socket::UDPConnection::SendNow(const char *sdata, size_t len, sockaddr * dA if (r > 0){ up += r; }else{ - if (errno != ENETUNREACH){ + if (errno != ENETUNREACH && !ignoreSendErrors){ FAIL_MSG("Could not send UDP data through %d: %s", sock, strerror(errno)); } } -#if !defined(__CYGWIN__) && !defined(_WIN32) +#ifdef HASPKTINFO } #endif } @@ -2372,6 +2452,10 @@ std::string Socket::UDPConnection::getBoundAddress(){ return boundaddr; } +uint16_t Socket::UDPConnection::getBoundPort() const{ + return boundPort; +} + /// Bind to a port number, returning the bound port. /// If that fails, returns zero. /// \arg port Port to bind to, required. @@ -2383,13 +2467,15 @@ uint16_t Socket::UDPConnection::bind(int port, std::string iface, const std::str close(); // we open a new socket for each attempt int addr_ret; bool multicast = false; + bool repeatWithIPv4 = false; struct addrinfo hints, *addr_result, *rp; memset(&hints, 0, sizeof(hints)); hints.ai_flags = AI_ADDRCONFIG | AI_PASSIVE | AI_V4MAPPED; - if (destAddr && destAddr_size){ - hints.ai_family = ((struct sockaddr_in *)destAddr)->sin_family; + if (destAddr.size()){ + hints.ai_family = ((struct sockaddr *)(char*)destAddr)->sa_family; }else{ - hints.ai_family = AF_UNSPEC; + hints.ai_family = AF_INET6; + repeatWithIPv4 = true; } hints.ai_socktype = SOCK_DGRAM; @@ -2398,14 +2484,24 @@ uint16_t Socket::UDPConnection::bind(int port, std::string iface, const std::str std::stringstream ss; ss << port; +repeatAddressFinding: + if (iface == "0.0.0.0" || iface.length() == 0){ if ((addr_ret = getaddrinfo(0, ss.str().c_str(), &hints, &addr_result)) != 0){ FAIL_MSG("Could not resolve %s for UDP: %s", iface.c_str(), gai_strmagic(addr_ret)); + if (repeatWithIPv4 && hints.ai_family != AF_INET){ + hints.ai_family = AF_INET; + goto repeatAddressFinding; + } return 0; } }else{ if ((addr_ret = getaddrinfo(iface.c_str(), ss.str().c_str(), &hints, &addr_result)) != 0){ FAIL_MSG("Could not resolve %s for UDP: %s", iface.c_str(), gai_strmagic(addr_ret)); + if (repeatWithIPv4 && hints.ai_family != AF_INET){ + hints.ai_family = AF_INET; + goto repeatAddressFinding; + } return 0; } } @@ -2422,7 +2518,7 @@ uint16_t Socket::UDPConnection::bind(int port, std::string iface, const std::str } if (rp->ai_family == AF_INET6){ const int optval = 0; - if (setsockopt(sock, SOL_SOCKET, IPV6_V6ONLY, &optval, sizeof(optval)) < 0){ + if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &optval, sizeof(optval)) < 0){ WARN_MSG("Could not set IPv6 UDP socket to be dual-stack! %s", strerror(errno)); } } @@ -2483,6 +2579,10 @@ uint16_t Socket::UDPConnection::bind(int port, std::string iface, const std::str freeaddrinfo(addr_result); if (sock == -1){ FAIL_MSG("Could not open %s for UDP: %s", iface.c_str(), err_str.c_str()); + if (repeatWithIPv4 && hints.ai_family != AF_INET){ + hints.ai_family = AF_INET; + goto repeatAddressFinding; + } return 0; } @@ -2570,7 +2670,7 @@ uint16_t Socket::UDPConnection::bind(int port, std::string iface, const std::str } bool Socket::UDPConnection::connect(){ - if (!recvAddr || !recvAddr_size || !destAddr || !destAddr_size){ + if (!recvAddr.size() || !destAddr.size()){ WARN_MSG("Attempting to connect a UDP socket without local and/or remote address!"); return false; } @@ -2579,27 +2679,23 @@ bool Socket::UDPConnection::connect(){ std::string destIp; uint32_t port = 0; char addr_str[INET6_ADDRSTRLEN + 1]; - if (((struct sockaddr_in *)recvAddr)->sin_family == AF_INET6){ - if (inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)recvAddr)->sin6_addr), addr_str, INET6_ADDRSTRLEN) != 0){ + if (((struct sockaddr *)(char*)recvAddr)->sa_family == AF_INET6){ + if (inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)(char*)recvAddr)->sin6_addr), addr_str, INET6_ADDRSTRLEN) != 0){ destIp = addr_str; - port = ntohs(((struct sockaddr_in6 *)recvAddr)->sin6_port); + port = ntohs(((struct sockaddr_in6 *)(char*)recvAddr)->sin6_port); } } - if (((struct sockaddr_in *)recvAddr)->sin_family == AF_INET){ - if (inet_ntop(AF_INET, &(((struct sockaddr_in *)recvAddr)->sin_addr), addr_str, INET6_ADDRSTRLEN) != 0){ + if (((struct sockaddr *)(char*)recvAddr)->sa_family == AF_INET){ + if (inet_ntop(AF_INET, &(((struct sockaddr_in *)(char*)recvAddr)->sin_addr), addr_str, INET6_ADDRSTRLEN) != 0){ destIp = addr_str; - port = ntohs(((struct sockaddr_in *)recvAddr)->sin_port); + port = ntohs(((struct sockaddr_in *)(char*)recvAddr)->sin_port); } } - int ret = ::bind(sock, (const struct sockaddr*)recvAddr, recvAddr_size); + int ret = ::bind(sock, (const struct sockaddr*)(char*)recvAddr, recvAddr.size()); if (!ret){ INFO_MSG("Bound socket %d to %s:%" PRIu32, sock, destIp.c_str(), port); }else{ - FAIL_MSG("Failed to bind socket %d (%s) %s:%" PRIu32 ": %s", sock, addrFam(((struct sockaddr_in *)recvAddr)->sin_family), destIp.c_str(), port, strerror(errno)); - std::ofstream bleh("/tmp/socket_recv"); - bleh.write((const char*)recvAddr, recvAddr_size); - bleh.write((const char*)destAddr, destAddr_size); - bleh.close(); + FAIL_MSG("Failed to bind socket %d (%s) %s:%" PRIu32 ": %s", sock, addrFam(((struct sockaddr *)(char*)recvAddr)->sa_family), destIp.c_str(), port, strerror(errno)); return false; } } @@ -2608,19 +2704,19 @@ bool Socket::UDPConnection::connect(){ std::string destIp; uint32_t port; char addr_str[INET6_ADDRSTRLEN + 1]; - if (((struct sockaddr_in *)destAddr)->sin_family == AF_INET6){ - if (inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)destAddr)->sin6_addr), addr_str, INET6_ADDRSTRLEN) != 0){ + if (((struct sockaddr *)(char*)destAddr)->sa_family == AF_INET6){ + if (inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)(char*)destAddr)->sin6_addr), addr_str, INET6_ADDRSTRLEN) != 0){ destIp = addr_str; - port = ntohs(((struct sockaddr_in6 *)destAddr)->sin6_port); + port = ntohs(((struct sockaddr_in6 *)(char*)destAddr)->sin6_port); } } - if (((struct sockaddr_in *)destAddr)->sin_family == AF_INET){ - if (inet_ntop(AF_INET, &(((struct sockaddr_in *)destAddr)->sin_addr), addr_str, INET6_ADDRSTRLEN) != 0){ + if (((struct sockaddr *)(char*)destAddr)->sa_family == AF_INET){ + if (inet_ntop(AF_INET, &(((struct sockaddr_in *)(char*)destAddr)->sin_addr), addr_str, INET6_ADDRSTRLEN) != 0){ destIp = addr_str; - port = ntohs(((struct sockaddr_in *)destAddr)->sin_port); + port = ntohs(((struct sockaddr_in *)(char*)destAddr)->sin_port); } } - int ret = ::connect(sock, (const struct sockaddr*)destAddr, destAddr_size); + int ret = ::connect(sock, (const struct sockaddr*)(char*)destAddr, destAddr.size()); if (!ret){ INFO_MSG("Connected socket to %s:%" PRIu32, destIp.c_str(), port); }else{ @@ -2685,18 +2781,37 @@ bool Socket::UDPConnection::Receive(){ if (errno != EAGAIN){INFO_MSG("UDP receive: %d (%s)", errno, strerror(errno));} return false; } - if (destAddr && destsize && destAddr_size >= destsize){memcpy(destAddr, &addr, destsize);} -#if !defined(__CYGWIN__) && !defined(_WIN32) - if (recvAddr){ + if (destAddr.size() && destsize){destAddr.assign(&addr, destsize);} +#ifdef HASPKTINFO + if (recvAddr.size()){ for ( struct cmsghdr *cmsg = CMSG_FIRSTHDR(&mHdr); cmsg != NULL; cmsg = CMSG_NXTHDR(&mHdr, cmsg)){ - if (cmsg->cmsg_level != IPPROTO_IP || cmsg->cmsg_type != IP_PKTINFO){continue;} - struct in_pktinfo* pi = (in_pktinfo*)CMSG_DATA(cmsg); - struct sockaddr_in * recvCast = (sockaddr_in*)recvAddr; - recvCast->sin_family = family; - recvCast->sin_port = htons(boundPort); - memcpy(&(recvCast->sin_addr), &(pi->ipi_spec_dst), sizeof(pi->ipi_spec_dst)); - recvInterface = pi->ipi_ifindex; - hasReceiveData = true; + if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO){ + struct in_pktinfo* pi = (in_pktinfo*)CMSG_DATA(cmsg); + if (family == AF_INET6){ + struct sockaddr_in6 * recvCast = (sockaddr_in6*)(char*)recvAddr; + recvCast->sin6_port = htons(boundPort); + recvCast->sin6_family = AF_INET6; + memcpy(((char*)&(recvCast->sin6_addr)) + 12, &(pi->ipi_spec_dst), sizeof(pi->ipi_spec_dst)); + memset((void*)&(recvCast->sin6_addr), 0, 10); + memset((char*)&(recvCast->sin6_addr) + 10, 255, 2); + }else{ + struct sockaddr_in * recvCast = (sockaddr_in*)(char*)recvAddr; + recvCast->sin_port = htons(boundPort); + recvCast->sin_family = AF_INET; + memcpy(&(recvCast->sin_addr), &(pi->ipi_spec_dst), sizeof(pi->ipi_spec_dst)); + } + recvInterface = pi->ipi_ifindex; + hasReceiveData = true; + } + if (cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_PKTINFO){ + struct in6_pktinfo* pi = (in6_pktinfo*)CMSG_DATA(cmsg); + struct sockaddr_in6 * recvCast = (sockaddr_in6*)(char*)recvAddr; + recvCast->sin6_family = AF_INET6; + recvCast->sin6_port = htons(boundPort); + memcpy(&(recvCast->sin6_addr), &(pi->ipi6_addr), sizeof(pi->ipi6_addr)); + recvInterface = pi->ipi6_ifindex; + hasReceiveData = true; + } } } #endif diff --git a/lib/socket.h b/lib/socket.h index 97785765..d0ca67e6 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -51,10 +51,13 @@ namespace Buffer{ /// Holds Socket tools. namespace Socket{ + std::string sockaddrToString(const sockaddr* A); void hostBytesToStr(const char *bytes, size_t len, std::string &target); bool isBinAddress(const std::string &binAddr, std::string matchTo); bool matchIPv6Addr(const std::string &A, const std::string &B, uint8_t prefix); + bool compareAddress(const sockaddr* A, const sockaddr* B); std::string getBinForms(std::string addr); + std::deque getAddrs(std::string addr, uint16_t port, int family = AF_UNSPEC); /// Returns true if given human-readable address (address, not hostname) is a local address. bool isLocal(const std::string &host); /// Returns true if given human-readable hostname is a local address. @@ -215,14 +218,12 @@ namespace Socket{ class UDPConnection{ private: void init(bool nonblock, int family = AF_INET6); - int sock; ///< Internally saved socket number. + int sock; ///< Internally saved socket number std::string remotehost; ///< Stores remote host address - void *destAddr; ///< Destination address pointer. - unsigned int destAddr_size; ///< Size of the destination address pointer. - void *recvAddr; ///< Destination address pointer. - unsigned int recvAddr_size; ///< Size of the destination address pointer. - unsigned int up; ///< Amount of bytes transferred up. - unsigned int down; ///< Amount of bytes transferred down. + Util::ResizeablePointer destAddr; ///< Destination address + Util::ResizeablePointer recvAddr; ///< Local address + unsigned int up; ///< Amount of bytes transferred up + unsigned int down; ///< Amount of bytes transferred down int family; ///< Current socket address family std::string boundAddr, boundMulti; int boundPort; @@ -233,6 +234,7 @@ namespace Socket{ bool hasReceiveData; bool isBlocking; bool isConnected; + bool ignoreSendErrors; bool pretendReceive; ///< If true, will pretend to have just received the current data buffer on new Receive() call bool onData(); @@ -254,6 +256,7 @@ namespace Socket{ UDPConnection(const UDPConnection &o); UDPConnection(bool nonblock = false); ~UDPConnection(); + void assimilate(int sock); bool operator==(const UDPConnection& b) const; operator bool() const; #ifdef SSL @@ -269,14 +272,18 @@ namespace Socket{ uint16_t bind(int port, std::string iface = "", const std::string &multicastAddress = ""); bool connect(); void setBlocking(bool blocking); + void setIgnoreSendErrors(bool ign); void allocateDestination(); void SetDestination(std::string hostname, uint32_t port); + bool setDestination(sockaddr * addr, size_t size); + const Util::ResizeablePointer & getRemoteAddr() const; void GetDestination(std::string &hostname, uint32_t &port); void GetLocalDestination(std::string &hostname, uint32_t &port); std::string getBinDestination(); const void * getDestAddr(){return destAddr;} - size_t getDestAddrLen(){return destAddr_size;} + size_t getDestAddrLen(){return destAddr.size();} std::string getBoundAddress(); + uint16_t getBoundPort() const; uint32_t getDestPort() const; bool Receive(); void SendNow(const std::string &data); diff --git a/lib/socket_srt.cpp b/lib/socket_srt.cpp index 091e34e1..21e60748 100644 --- a/lib/socket_srt.cpp +++ b/lib/socket_srt.cpp @@ -1,5 +1,5 @@ #include "defines.h" -#include "lib/http_parser.h" +#include "http_parser.h" #include "socket_srt.h" #include "json.h" #include "timing.h" @@ -9,6 +9,15 @@ #define INVALID_SRT_SOCKET -1 +/// Calls gai_strerror with the given argument, calling regular strerror on the global errno as needed +static const char *gai_strmagic(int errcode){ + if (errcode == EAI_SYSTEM){ + return strerror(errno); + }else{ + return gai_strerror(errcode); + } +} + namespace Socket{ namespace SRT{ bool isInited = false; @@ -26,6 +35,7 @@ namespace Socket{ bool libraryCleanup(){ if (isInited){ + alarm(2); srt_cleanup(); isInited = false; } @@ -41,17 +51,36 @@ namespace Socket{ sockaddr_in createInetAddr(const std::string &_host, int _port){ sockaddr_in res; - memset(&res, 9, sizeof res); - res.sin_family = AF_INET; - res.sin_port = htons(_port); + memset(&res, 0, sizeof res); + struct addrinfo *result, *rp, hints; + std::stringstream ss; + ss << _port; - if (_host != ""){ - if (inet_pton(AF_INET, _host.c_str(), &res.sin_addr) == 1){return res;} - hostent *he = gethostbyname(_host.c_str()); - if (!he || he->h_addrtype != AF_INET){ERROR_MSG("Host not found %s", _host.c_str());} - res.sin_addr = *(in_addr *)he->h_addr_list[0]; + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_INET6; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_flags = AI_ADDRCONFIG | AI_ALL; + hints.ai_protocol = IPPROTO_UDP; + hints.ai_canonname = NULL; + hints.ai_addr = NULL; + hints.ai_next = NULL; + int s = getaddrinfo(_host.c_str(), ss.str().c_str(), &hints, &result); + if (s != 0){ + hints.ai_family = AF_UNSPEC; + s = getaddrinfo(_host.c_str(), ss.str().c_str(), &hints, &result); + if (s != 0){ + FAIL_MSG("Could not connect SRT socket to %s:%i! Error: %s", _host.c_str(), _port, gai_strmagic(s)); + return res; + } } + for (rp = result; rp != NULL; rp = rp->ai_next){ + size_t maxSize = rp->ai_addrlen; + if (maxSize > sizeof(res)){maxSize = sizeof(res);} + memcpy(&res, rp->ai_addr, maxSize); + break; + } + freeaddrinfo(result); return res; } @@ -91,9 +120,9 @@ namespace Socket{ direction = rhs.direction; remotehost = rhs.remotehost; sock = rhs.sock; + HIGH_MSG("COPIED SRT socket %d", sock); performanceMonitor = rhs.performanceMonitor; host = rhs.host; - outgoing_port = rhs.outgoing_port; prev_pktseq = rhs.prev_pktseq; lastGood = rhs.lastGood; chunkTransmitSize = rhs.chunkTransmitSize; @@ -110,17 +139,94 @@ namespace Socket{ SRTConnection::SRTConnection(const std::string &_host, int _port, const std::string &_direction, const std::map &_params){ + initializeEmpty(); connect(_host, _port, _direction, _params); } + SRTConnection::SRTConnection(Socket::UDPConnection & _udpsocket, const std::string &_direction, const paramList &_params){ + initializeEmpty(); + direction = "output"; + handleConnectionParameters("", _params); + HIGH_MSG("Opening SRT connection in %s mode (%s) on socket %d", modeName.c_str(), direction.c_str(), _udpsocket.getSock()); + + // Copy address from UDP socket + memcpy(&remoteaddr, _udpsocket.getDestAddr(), _udpsocket.getDestAddrLen()); + static char addrconv[INET6_ADDRSTRLEN]; + if (remoteaddr.sin6_family == AF_INET6){ + remotehost = inet_ntop(AF_INET6, &(remoteaddr.sin6_addr), addrconv, INET6_ADDRSTRLEN); + HIGH_MSG("IPv6 addr [%s]", remotehost.c_str()); + } + if (remoteaddr.sin6_family == AF_INET){ + remotehost = inet_ntop(AF_INET, &(((sockaddr_in *)&remoteaddr)->sin_addr), addrconv, INET6_ADDRSTRLEN); + HIGH_MSG("IPv4 addr [%s]", remotehost.c_str()); + } + + sock = srt_create_socket(); + HIGH_MSG("Opened SRT socket %d", sock); + + if (_direction == "rendezvous"){ + bool v = true; + srt_setsockopt(sock, 0, SRTO_RENDEZVOUS, &v, sizeof v); + } + + if (preConfigureSocket() == SRT_ERROR){ + ERROR_MSG("Error configuring SRT socket"); + return; + } + + srt_bind_acquire(sock, _udpsocket.getSock()); + if (sock == SRT_INVALID_SOCK){ + ERROR_MSG("Error creating an SRT socket from bound UDP socket"); + return; + } + + lastGood = Util::bootMS(); + if (_direction == "rendezvous"){return;} + + srt_listen(sock, 1); + SRTSOCKET tmpSock = srt_accept_bond(&sock, 1, 10000); + HIGH_MSG("Opened SRT socket %d", tmpSock); + close(); + sock = tmpSock; + + if (sock == SRT_INVALID_SOCK){ + FAIL_MSG("SRT error: %s", srt_getlasterror_str()); + return; + } + + if (postConfigureSocket() == SRT_ERROR){ + ERROR_MSG("Error during postconfigure socket"); + return; + } + HIGH_MSG("UDP to SRT socket conversion %" PRId32 ": %s", sock, getStateStr()); + } + + const char * SRTConnection::getStateStr(){ + if (sock == INVALID_SRT_SOCKET){return "invalid / closed";} + int state = srt_getsockstate(sock); + switch (state){ + case SRTS_INIT: return "init"; + case SRTS_OPENED: return "opened"; + case SRTS_LISTENING: return "listening"; + case SRTS_CONNECTING: return "connecting"; + case SRTS_CONNECTED: return "connected"; + case SRTS_BROKEN: return "broken"; + case SRTS_CLOSING: return "closing"; + case SRTS_CLOSED: return "closed"; + case SRTS_NONEXIST: return "does not exist"; + } + return "unknown"; + } + SRTConnection::SRTConnection(SRTSOCKET alreadyConnected){ initializeEmpty(); sock = alreadyConnected; + HIGH_MSG("COPIED SRT socket %d", sock); } std::string SRTConnection::getStreamName(){ int sNameLen = 512; - char sName[sNameLen]; + char sName[512]; int optRes = srt_getsockflag(sock, SRTO_STREAMID, (void *)sName, &sNameLen); if (optRes != -1 && sNameLen){return sName;} return ""; @@ -158,8 +264,8 @@ namespace Socket{ } if (err == SRT_ENOCONN){ if (Util::bootMS() > lastGood + 5000){ - ERROR_MSG("SRT connection timed out - closing"); - close(); + ERROR_MSG("SRT connection timed out"); + timedOut = true; } return 0; } @@ -169,6 +275,7 @@ namespace Socket{ } if (receivedBytes == 0){ close(); + return 0; }else{ lastGood = Util::bootMS(); } @@ -188,13 +295,14 @@ namespace Socket{ int err = srt_getlasterror(0); if (err == SRT_EASYNCRCV){return 0;} if (err == SRT_ECONNLOST){ + INFO_MSG("SRT connection %d lost", sock); close(); return 0; } if (err == SRT_ENOCONN){ if (Util::bootMS() > lastGood + 5000){ ERROR_MSG("SRT connection timed out - closing"); - close(); + timedOut = true; } return 0; } @@ -203,7 +311,9 @@ namespace Socket{ return 0; } if (receivedBytes == 0){ + INFO_MSG("SRT connection %d closed", sock); close(); + return 0; }else{ lastGood = Util::bootMS(); } @@ -213,51 +323,43 @@ namespace Socket{ void SRTConnection::connect(const std::string &_host, int _port, const std::string &_direction, const std::map &_params){ - initializeEmpty(); - direction = _direction; - + timedOut = false; handleConnectionParameters(_host, _params); - HIGH_MSG("Opening SRT connection %s in %s mode on %s:%d", modeName.c_str(), direction.c_str(), _host.c_str(), _port); - sock = srt_create_socket(); - if (sock == SRT_ERROR){ - ERROR_MSG("Error creating an SRT socket"); - return; - } - if (modeName == "rendezvous"){ - bool v = true; - srt_setsockopt(sock, 0, SRTO_RENDEZVOUS, &v, sizeof v); - } - if (preConfigureSocket() == SRT_ERROR){ - ERROR_MSG("Error configuring SRT socket"); - return; + if (sock == SRT_INVALID_SOCK){ + sock = srt_create_socket(); + HIGH_MSG("Opened SRT socket %d", sock); + if (sock == SRT_INVALID_SOCK){ + ERROR_MSG("Error creating an SRT socket"); + return; + } + if (preConfigureSocket() == SRT_ERROR){ + ERROR_MSG("Error configuring SRT socket"); + return; + } } if (modeName == "caller"){ - if (outgoing_port){setupAdapter("", outgoing_port);} + std::deque addrs = Socket::getAddrs(_host, _port); + for (std::deque::iterator it = addrs.begin(); it != addrs.end(); ++it){ + size_t maxSize = it->size(); + if (maxSize > sizeof(remoteaddr)){maxSize = sizeof(remoteaddr);} + memcpy(&remoteaddr, it->data(), maxSize); - sockaddr_in sa = createInetAddr(_host, _port); - memcpy(&remoteaddr, &sa, sizeof(sockaddr_in)); - sockaddr *psa = (sockaddr *)&sa; - - HIGH_MSG("Going to connect sock %d", sock); - if (srt_connect(sock, psa, sizeof sa) == SRT_ERROR){ - srt_close(sock); - sock = -1; - ERROR_MSG("Can't connect SRT Socket"); - return; + sockaddr *psa = (sockaddr *)&remoteaddr; + HIGH_MSG("Going to connect sock %d", sock); + if (srt_connect(sock, psa, sizeof remoteaddr) != SRT_ERROR){ + if (postConfigureSocket() == SRT_ERROR){ERROR_MSG("Error during postconfigure socket");} + INFO_MSG("Caller SRT socket %" PRId32 " success targetting %s:%u", sock, _host.c_str(), _port); + lastGood = Util::bootMS(); + return; + } } - HIGH_MSG("Connected sock %d", sock); - - if (postConfigureSocket() == SRT_ERROR){ - ERROR_MSG("Error during postconfigure socket"); - return; - } - INFO_MSG("Caller SRT socket %" PRId32 " success targetting %s:%u", sock, _host.c_str(), _port); - lastGood = Util::bootMS(); + close(); + ERROR_MSG("Can't connect SRT socket to any address for %s", _host.c_str()); return; } if (modeName == "listener"){ @@ -267,61 +369,19 @@ namespace Socket{ sockaddr *psa = (sockaddr *)&sa; if (srt_bind(sock, psa, sizeof sa) == SRT_ERROR){ - srt_close(sock); - sock = -1; + close(); ERROR_MSG("Can't connect SRT Socket: %s", srt_getlasterror_str()); return; } if (srt_listen(sock, 100) == SRT_ERROR){ - srt_close(sock); - sock = -1; + close(); ERROR_MSG("Can not listen on Socket"); } INFO_MSG("Listener SRT socket success @ %s:%u", _host.c_str(), _port); lastGood = Util::bootMS(); return; } - if (modeName == "rendezvous"){ - int outport = (outgoing_port ? outgoing_port : _port); - HIGH_MSG("Going to bind a server on %s:%u", _host.c_str(), _port); - - sockaddr_in sa = createInetAddr(_host, outport); - sockaddr *psa = (sockaddr *)&sa; - - if (srt_bind(sock, psa, sizeof sa) == SRT_ERROR){ - srt_close(sock); - sock = -1; - ERROR_MSG("Can't connect SRT Socket"); - return; - } - - sockaddr_in sb = createInetAddr(_host, outport); - sockaddr *psb = (sockaddr *)&sb; - - if (srt_connect(sock, psb, sizeof sb) == SRT_ERROR){ - srt_close(sock); - sock = -1; - ERROR_MSG("Can't connect SRT Socket"); - return; - } - - if (postConfigureSocket() == SRT_ERROR){ - ERROR_MSG("Error during postconfigure socket"); - return; - } - INFO_MSG("Rendezvous SRT socket success @ %s:%u", _host.c_str(), _port); - lastGood = Util::bootMS(); - return; - } - ERROR_MSG("Invalid mode parameter. Use 'client' or 'server'"); - } - - void SRTConnection::setupAdapter(const std::string &_host, int _port){ - sockaddr_in localsa = createInetAddr(_host, _port); - sockaddr *psa = (sockaddr *)&localsa; - if (srt_bind(sock, psa, sizeof localsa) == SRT_ERROR){ - ERROR_MSG("Unable to bind socket to %s:%u", _host.c_str(), _port); - } + ERROR_MSG("Invalid mode parameter. Use 'caller' or 'listener'"); } void SRTConnection::SendNow(const std::string &data){SendNow(data.data(), data.size());} @@ -338,14 +398,17 @@ namespace Socket{ return; } if (err == SRT_ENOCONN){ - if (Util::bootMS() > lastGood + 5000){ + if (Util::bootMS() > lastGood + 10000){ ERROR_MSG("SRT connection timed out - closing"); - close(); + timedOut = true; } return; } // ERROR_MSG("Unable to send data over socket %" PRId32 ": %s", sock, srt_getlasterror_str()); - if (srt_getsockstate(sock) != SRTS_CONNECTED){close();} + if (srt_getsockstate(sock) != SRTS_CONNECTED){ + close(); + return; + } }else{ lastGood = Util::bootMS(); } @@ -378,9 +441,9 @@ namespace Socket{ memset(&performanceMonitor, 0, sizeof(performanceMonitor)); prev_pktseq = 0; sock = SRT_INVALID_SOCK; - outgoing_port = 0; chunkTransmitSize = 1316; blocking = false; + timedOut = false; timeout = 0; } @@ -397,9 +460,9 @@ namespace Socket{ void SRTConnection::handleConnectionParameters(const std::string &_host, const std::map &_params){ params = _params; - DONTEVEN_MSG("SRT Received parameters: "); + VERYHIGH_MSG("SRT Received parameters: "); for (std::map::const_iterator it = params.begin(); it != params.end(); it++){ - DONTEVEN_MSG(" %s: %s", it->first.c_str(), it->second.c_str()); + VERYHIGH_MSG(" %s: %s", it->first.c_str(), it->second.c_str()); } adapter = (params.count("adapter") ? params.at("adapter") : ""); @@ -417,8 +480,6 @@ namespace Socket{ tsbpdMode = (params.count("tsbpd") && JSON::Value(params.at("tsbpd")).asBool()); - outgoing_port = (params.count("port") ? strtol(params.at("port").c_str(), 0, 0) : 0); - if ((!params.count("transtype") || params.at("transtype") != "file") && chunkTransmitSize > SRT_LIVE_DEF_PLSIZE){ if (chunkTransmitSize > SRT_LIVE_MAX_PLSIZE){ ERROR_MSG("Chunk size in live mode exceeds 1456 bytes!"); @@ -439,12 +500,10 @@ namespace Socket{ } if (srt_setsockopt(sock, 0, SRTO_RCVSYN, &no, sizeof no) == -1){return -1;} - if (params.count("linger")){ - linger lin; - lin.l_linger = atoi(params.at("linger").c_str()); - lin.l_onoff = lin.l_linger > 0 ? 1 : 0; - srt_setsockopt(sock, 0, SRTO_LINGER, &lin, sizeof(linger)); - } + linger lin; + lin.l_linger = params.count("linger") ? atoi(params.at("linger").c_str()) : 0; + lin.l_onoff = lin.l_linger ? 1 : 0; + srt_setsockopt(sock, 0, SRTO_LINGER, &lin, sizeof(linger)); std::string errMsg = configureSocketLoop(SRT::SockOpt::PRE); if (errMsg.size()){ @@ -490,9 +549,11 @@ namespace Socket{ } void SRTConnection::close(){ - if (sock != -1){ + if (sock != INVALID_SRT_SOCKET){ + HIGH_MSG("Closing SRT socket %d", sock); + setBlocking(true); srt_close(sock); - sock = -1; + sock = INVALID_SRT_SOCKET; } } diff --git a/lib/socket_srt.h b/lib/socket_srt.h index c25ec1da..e1e97491 100644 --- a/lib/socket_srt.h +++ b/lib/socket_srt.h @@ -37,12 +37,14 @@ namespace Socket{ SRTConnection(SRTSOCKET alreadyConnected); SRTConnection(const std::string &_host, int _port, const std::string &_direction = "input", const paramList &_params = paramList()); + SRTConnection(Socket::UDPConnection & _udpsocket, const std::string &_direction, const paramList &_params); void connect(const std::string &_host, int _port, const std::string &_direction = "input", const paramList &_params = paramList()); void close(); - bool connected() const{return sock != -1;} + bool connected() const{return (sock != -1) && !timedOut;} operator bool() const{return connected();} + const char * getStateStr(); void setBlocking(bool blocking); ///< Set this socket to be blocking (true) or nonblocking (false). bool isBlocking(); ///< Check if this socket is blocking (true) or nonblocking (false). @@ -77,9 +79,9 @@ namespace Socket{ CBytePerfMon performanceMonitor; std::string host; - int outgoing_port; int32_t prev_pktseq; uint64_t lastGood; + bool timedOut; uint32_t chunkTransmitSize; @@ -94,7 +96,6 @@ namespace Socket{ void handleConnectionParameters(const std::string &_host, const paramList &_params); int preConfigureSocket(); std::string configureSocketLoop(SRT::SockOpt::Binding _binding); - void setupAdapter(const std::string &_host, int _port); bool blocking; }; diff --git a/lib/util.cpp b/lib/util.cpp index e488e4aa..b434194d 100644 --- a/lib/util.cpp +++ b/lib/util.cpp @@ -361,6 +361,20 @@ namespace Util{ maxSize = 0; } + ResizeablePointer::ResizeablePointer(const ResizeablePointer & rhs){ + currSize = 0; + ptr = 0; + maxSize = 0; + append(rhs, rhs.size()); + } + + ResizeablePointer& ResizeablePointer::operator= (const ResizeablePointer& rhs){ + if (this == &rhs){return *this;} + truncate(0); + append(rhs, rhs.size()); + return *this; + } + void ResizeablePointer::shift(size_t byteCount){ //Shifting the entire buffer is easy, we do nothing and set size to zero if (byteCount >= currSize){ @@ -372,6 +386,20 @@ namespace Util{ currSize -= byteCount; } + /// Takes another ResizeablePointer as argument and swaps their pointers around, + /// thus exchanging them without needing to copy anything. + void ResizeablePointer::swap(ResizeablePointer & rhs){ + void * tmpPtr = ptr; + size_t tmpCurrSize = currSize; + size_t tmpMaxSize = maxSize; + ptr = rhs.ptr; + currSize = rhs.currSize; + maxSize = rhs.maxSize; + rhs.ptr = tmpPtr; + rhs.currSize = tmpCurrSize; + rhs.maxSize = tmpMaxSize; + } + bool ResizeablePointer::assign(const void *p, uint32_t l){ if (!allocate(l)){return false;} memcpy(ptr, p, l); @@ -406,12 +434,7 @@ namespace Util{ bool ResizeablePointer::allocate(uint32_t l){ if (l > maxSize){ - void *tmp = 0; - if (!ptr){ - tmp = malloc(l); - }else{ - tmp = realloc(ptr, l); - } + void *tmp = realloc(ptr, l); if (!tmp){ FAIL_MSG("Could not allocate %" PRIu32 " bytes of memory", l); return false; diff --git a/lib/util.h b/lib/util.h index c656edf2..b9d3a178 100644 --- a/lib/util.h +++ b/lib/util.h @@ -48,6 +48,8 @@ namespace Util{ class ResizeablePointer{ public: ResizeablePointer(); + ResizeablePointer(const ResizeablePointer & rhs); + ResizeablePointer& operator= (const ResizeablePointer& rhs); ~ResizeablePointer(); inline size_t &size(){return currSize;} inline const size_t size() const{return currSize;} @@ -57,6 +59,7 @@ namespace Util{ bool append(const std::string &str); bool allocate(uint32_t l); void shift(size_t byteCount); + void swap(ResizeablePointer & rhs); uint32_t rsize(); void truncate(const size_t newLen); inline operator char *(){return (char *)ptr;} diff --git a/meson.build b/meson.build index af8314fb..d3b8c590 100644 --- a/meson.build +++ b/meson.build @@ -100,6 +100,9 @@ message('Building release @0@ for version @1@ @ debug level @2@'.format(release, mist_deps = [] ccpp = meson.get_compiler('cpp') +if ccpp.has_header_symbol('netinet/in.h', 'IPV6_RECVPKTINFO') and ccpp.has_header_symbol('netinet/in.h', 'IP_PKTINFO') + option_defines += '-DHASPKTINFO' +endif if usessl mbedtls = ccpp.find_library('mbedtls', required: false) diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index f64d829d..454c1fe5 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -174,13 +174,23 @@ void Controller::updateBandwidthConfig(){ /// This function is ran whenever a stream becomes active. void Controller::streamStarted(std::string stream){ INFO_MSG("Stream %s became active", stream.c_str()); - if (tagQueue.count(stream)){ - tagQueueItem & q = tagQueue[stream]; - for (std::set::iterator it = q.tags.begin(); it != q.tags.end(); ++it){ - streamStats[stream].tags.insert(*it); + { + streamTotals & sT = streamStats[stream]; + JSON::Value strCnf = Util::getStreamConfig(stream); + if (strCnf.isMember("tags")){ + jsonForEachConst(strCnf["tags"], it){ + sT.tags.insert(it->asString()); + } + INFO_MSG("Applied %" PRIu32 " tags to stream %s from config",strCnf["tags"].size() , stream.c_str()); + } + if (tagQueue.count(stream)){ + tagQueueItem & q = tagQueue[stream]; + for (std::set::iterator it = q.tags.begin(); it != q.tags.end(); ++it){ + sT.tags.insert(*it); + } + INFO_MSG("Applied %zu tags to stream %s retroactively",q.tags.size() , stream.c_str()); + tagQueue.erase(stream); } - INFO_MSG("Applied %zu tags to stream %s retroactively",q.tags.size() , stream.c_str()); - tagQueue.erase(stream); } Controller::doAutoPush(stream); } @@ -425,7 +435,7 @@ void Controller::SharedMemStats(void *config){ for (uint64_t cPos = startPos; cPos < endPos; ++cPos){ std::string strm = strmStats->getPointer("stream", cPos); std::string tags = strmStats->getPointer("tags", cPos); - if (tags.size() && streamStats.count(strm)){ + if (strm.size() && tags.size() && streamStats.count(strm)){ INFO_MSG("Restoring stream tags: %s -> %s", strm.c_str(), tags.c_str()); streamTotals & st = streamStats[strm]; while (tags.size()){ @@ -468,7 +478,8 @@ void Controller::SharedMemStats(void *config){ cutOffPoint = 0; } for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ - streamStats[it->second.getStreamName()].currSessions++; + std::string strm = it->second.getStreamName(); + if (strm.size()){streamStats[strm].currSessions++;} // This part handles ending sessions, keeping them in cache for now if (it->second.getEnd() < cutOffPoint){ viewSecondsTotal += it->second.getConnTime(); @@ -476,30 +487,32 @@ void Controller::SharedMemStats(void *config){ // Don't count this session as a viewer continue; } - // Recount input, output and viewer type sessions - switch (it->second.getSessType()){ - case SESS_UNSET: break; - case SESS_VIEWER: - if (it->second.hasDataFor(tOut)){ - streamStats[it->second.getStreamName()].currViews++; + if (strm.size()){ + // Recount input, output and viewer type sessions + switch (it->second.getSessType()){ + case SESS_UNSET: break; + case SESS_VIEWER: + if (it->second.hasDataFor(tOut)){ + streamStats[it->second.getStreamName()].currViews++; + } + servSeconds += it->second.getConnTime(); + break; + case SESS_INPUT: + if (it->second.hasDataFor(tIn)){ + streamStats[it->second.getStreamName()].currIns++; + } + break; + case SESS_OUTPUT: + if (it->second.hasDataFor(tOut)){ + streamStats[it->second.getStreamName()].currOuts++; + } + break; + case SESS_UNSPECIFIED: + if (it->second.hasDataFor(tOut)){ + streamStats[it->second.getStreamName()].currUnspecified++; + } + break; } - servSeconds += it->second.getConnTime(); - break; - case SESS_INPUT: - if (it->second.hasDataFor(tIn)){ - streamStats[it->second.getStreamName()].currIns++; - } - break; - case SESS_OUTPUT: - if (it->second.hasDataFor(tOut)){ - streamStats[it->second.getStreamName()].currOuts++; - } - break; - case SESS_UNSPECIFIED: - if (it->second.hasDataFor(tOut)){ - streamStats[it->second.getStreamName()].currUnspecified++; - } - break; } } while (mustWipe.size()){ @@ -753,14 +766,15 @@ void Controller::statSession::update(uint64_t index, Comms::Sessions &statComm){ } } // Only count connections that are countable - if (noBWCount != 2){ + if (noBWCount != 2 && streamName.size()){ createEmptyStatsIfNeeded(streamName); - streamStats[streamName].upBytes += currUp - prevUp; - streamStats[streamName].downBytes += currDown - prevDown; - streamStats[streamName].packSent += currPktSent - prevPktSent; - streamStats[streamName].packLoss += currPktLost - prevPktLost; - streamStats[streamName].packRetrans += currPktRetrans - prevPktRetrans; - if (sessionType == SESS_VIEWER){streamStats[streamName].viewSeconds += secIncr;} + streamTotals & sT = streamStats[streamName]; + sT.upBytes += currUp - prevUp; + sT.downBytes += currDown - prevDown; + sT.packSent += currPktSent - prevPktSent; + sT.packLoss += currPktLost - prevPktLost; + sT.packRetrans += currPktRetrans - prevPktRetrans; + if (sessionType == SESS_VIEWER){sT.viewSeconds += secIncr;} } } diff --git a/src/input/input_tssrt.cpp b/src/input/input_tssrt.cpp index fc1e1644..a9b39662 100644 --- a/src/input/input_tssrt.cpp +++ b/src/input/input_tssrt.cpp @@ -1,8 +1,10 @@ #include "input_tssrt.h" +#include #include #include #include #include +#include #include #include #include @@ -13,11 +15,9 @@ #include #include #include -#include - +#include #include #include -#include Util::Config *cfgPointer = NULL; std::string baseStreamName; @@ -33,7 +33,6 @@ void signal_handler(int signum, siginfo_t *sigInfo, void *ignore){ } } - // We use threads here for multiple input pushes, because of the internals of the SRT Library static void callThreadCallbackSRT(void *socknum){ // use the accepted socket as the second parameter @@ -47,6 +46,8 @@ namespace Mist{ /// \arg cfg Util::Config that contains all current configurations. InputTSSRT::InputTSSRT(Util::Config *cfg, Socket::SRTConnection s) : Input(cfg){ rawIdx = INVALID_TRACK_ID; + udpInit = 0; + srtConn = 0; lastRawPacket = 0; bootMSOffsetCalculated = false; assembler.setLive(); @@ -136,16 +137,17 @@ namespace Mist{ // Setup if we are called form with a thread for push-based input. if (s.connected()){ - srtConn = s; + srtConn = new Socket::SRTConnection(s); streamName = baseStreamName; - std::string streamid = srtConn.getStreamName(); + std::string streamid = srtConn->getStreamName(); int64_t acc = config->getInteger("acceptable"); if (acc == 0){ if (streamid.size()){streamName += "+" + streamid;} }else if(acc == 2){ if (streamName != streamid){ FAIL_MSG("Stream ID '%s' does not match stream name, push blocked", streamid.c_str()); - srtConn.close(); + srtConn->close(); + srtConn = 0; } } Util::setStreamName(streamName); @@ -169,46 +171,49 @@ namespace Mist{ Socket::SRT::libraryInit(); rawMode = config->getBool("raw"); if (rawMode){INFO_MSG("Entering raw mode");} - if (srtConn.getSocket() == -1){ - std::string source = config->getString("input"); - standAlone = false; - HTTP::URL u(source); - INFO_MSG("Parsed url: %s", u.getUrl().c_str()); - if (Socket::interpretSRTMode(u) == "listener"){ - std::map arguments; - HTTP::parseVars(u.args, arguments); - sSock = Socket::SRTServer(u.getPort(), u.host, arguments, false); - struct sigaction new_action; - struct sigaction cur_action; - new_action.sa_sigaction = signal_handler; - sigemptyset(&new_action.sa_mask); - new_action.sa_flags = SA_SIGINFO; - sigaction(SIGINT, &new_action, &cur_action); - if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ - if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} - oldSignal = cur_action.sa_sigaction; - } - sigaction(SIGHUP, &new_action, &cur_action); - if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ - if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} - oldSignal = cur_action.sa_sigaction; - } - sigaction(SIGTERM, &new_action, &cur_action); - if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ - if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} - oldSignal = cur_action.sa_sigaction; - } - }else{ - std::map arguments; - HTTP::parseVars(u.args, arguments); - size_t connectCnt = 0; - do{ - srtConn.connect(u.host, u.getPort(), "input", arguments); - if (!srtConn){Util::sleep(1000);} - ++connectCnt; - }while (!srtConn && connectCnt < 10); - if (!srtConn){WARN_MSG("Connecting to %s timed out", u.getUrl().c_str());} + if (srtConn && *srtConn){return true;} + std::string source = config->getString("input"); + standAlone = false; + HTTP::URL u(source); + INFO_MSG("Parsed url: %s", u.getUrl().c_str()); + if (Socket::interpretSRTMode(u) == "listener"){ + std::map arguments; + HTTP::parseVars(u.args, arguments); + sSock = Socket::SRTServer(u.getPort(), u.host, arguments, false); + struct sigaction new_action; + struct sigaction cur_action; + new_action.sa_sigaction = signal_handler; + sigemptyset(&new_action.sa_mask); + new_action.sa_flags = SA_SIGINFO; + sigaction(SIGINT, &new_action, &cur_action); + if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ + if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} + oldSignal = cur_action.sa_sigaction; } + sigaction(SIGHUP, &new_action, &cur_action); + if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ + if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} + oldSignal = cur_action.sa_sigaction; + } + sigaction(SIGTERM, &new_action, &cur_action); + if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ + if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} + oldSignal = cur_action.sa_sigaction; + } + }else{ + std::map arguments; + HTTP::parseVars(u.args, arguments); + + std::string addData; + if (arguments.count("streamid")){addData = arguments["streamid"];} + size_t connectCnt = 0; + do{ + if (!srtConn){srtConn = new Socket::SRTConnection();} + srtConn->connect(u.host, u.getPort(), "input", arguments); + if (!*srtConn){Util::sleep(1000);} + ++connectCnt; + }while (!*srtConn && connectCnt < 10); + if (!*srtConn){WARN_MSG("Connecting to %s timed out", u.getUrl().c_str());} } return true; } @@ -217,13 +222,13 @@ namespace Mist{ void InputTSSRT::getNext(size_t idx){ thisPacket.null(); bool hasPacket = tsStream.hasPacket(); - while (!hasPacket && srtConn && config->is_active){ + while (!hasPacket && srtConn && *srtConn && config->is_active){ - size_t recvSize = srtConn.RecvNow(); + size_t recvSize = srtConn->RecvNow(); if (recvSize){ if (rawMode){ keepAlive(); - rawBuffer.append(srtConn.recvbuf, recvSize); + rawBuffer.append(srtConn->recvbuf, recvSize); if (rawBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){ if (rawIdx == INVALID_TRACK_ID){ rawIdx = meta.addTrack(); @@ -240,8 +245,8 @@ namespace Mist{ } continue; } - if (assembler.assemble(tsStream, srtConn.recvbuf, recvSize, true)){hasPacket = tsStream.hasPacket();} - }else if (srtConn){ + if (assembler.assemble(tsStream, srtConn->recvbuf, recvSize, true)){hasPacket = tsStream.hasPacket();} + }else if (srtConn && *srtConn){ // This should not happen as the SRT socket is read blocking and won't return until there is // data. But if it does, wait before retry Util::sleep(10); @@ -250,7 +255,7 @@ namespace Mist{ if (hasPacket){tsStream.getEarliestPacket(thisPacket);} if (!thisPacket){ - if (srtConn){ + if (srtConn && *srtConn){ INFO_MSG("Could not getNext TS packet!"); Util::logExitReason(ER_FORMAT_SPECIFIC, "internal TS parser error"); }else{ @@ -286,7 +291,7 @@ namespace Mist{ void InputTSSRT::streamMainLoop(){ // If we do not have a srtConn here, we are the main thread and should start accepting pushes. - if (srtConn.getSocket() == -1){ + if (!srtConn || !*srtConn){ cfgPointer = config; baseStreamName = streamName; while (config->is_active && sSock.connected()){ @@ -304,7 +309,7 @@ namespace Mist{ } // If we are here: we have a proper connection (either accepted or pull input) and should start parsing it as such Input::streamMainLoop(); - srtConn.close(); + srtConn->close(); Socket::SRT::libraryCleanup(); } @@ -313,11 +318,11 @@ namespace Mist{ void InputTSSRT::setSingular(bool newSingular){singularFlag = newSingular;} void InputTSSRT::connStats(Comms::Connections &statComm){ - statComm.setUp(srtConn.dataUp()); - statComm.setDown(srtConn.dataDown()); - statComm.setPacketCount(srtConn.packetCount()); - statComm.setPacketLostCount(srtConn.packetLostCount()); - statComm.setPacketRetransmitCount(srtConn.packetRetransmitCount()); + statComm.setUp(srtConn->dataUp()); + statComm.setDown(srtConn->dataDown()); + statComm.setPacketCount(srtConn->packetCount()); + statComm.setPacketLostCount(srtConn->packetLostCount()); + statComm.setPacketRetransmitCount(srtConn->packetRetransmitCount()); } }// namespace Mist diff --git a/src/input/input_tssrt.h b/src/input/input_tssrt.h index 786eb165..2b8922a0 100644 --- a/src/input/input_tssrt.h +++ b/src/input/input_tssrt.h @@ -4,7 +4,6 @@ #include #include #include -#include #include namespace Mist{ @@ -16,7 +15,7 @@ namespace Mist{ void setSingular(bool newSingular); virtual bool needsLock(); virtual std::string getConnectedBinHost(){ - if (srtConn){return srtConn.getBinHost();} + if (srtConn && *srtConn){return srtConn->getBinHost();} return Input::getConnectedBinHost(); } @@ -38,7 +37,8 @@ namespace Mist{ int64_t timeStampOffset; uint64_t lastTimeStamp; - Socket::SRTConnection srtConn; + Socket::SRTConnection * srtConn; + Socket::UDPConnection * udpInit; bool singularFlag; virtual void connStats(Comms::Connections &statComm); diff --git a/src/output/meson.build b/src/output/meson.build index 80a3c8fe..bbb99393 100644 --- a/src/output/meson.build +++ b/src/output/meson.build @@ -37,7 +37,7 @@ if have_librist endif if have_srt - outputs += {'name' : 'TSSRT', 'format' : 'tssrt', 'extra': ['ts', 'debased', 'with_srt']} + outputs += {'name' : 'TSSRT', 'format' : 'tssrt', 'extra': ['ts', 'with_srt']} endif if get_option('WITH_SANITY') diff --git a/src/output/output.cpp b/src/output/output.cpp index b42e7ce1..e14b352a 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -93,6 +93,7 @@ namespace Mist{ Output::Output(Socket::Connection &conn) : myConn(conn){ dataWaitTimeout = 2500; + thisBootMs = Util::bootMS(); pushing = false; recursingSync = false; firstTime = Util::bootMS(); diff --git a/src/output/output_ts.cpp b/src/output/output_ts.cpp index 0631aa36..de9bd951 100644 --- a/src/output/output_ts.cpp +++ b/src/output/output_ts.cpp @@ -74,7 +74,7 @@ namespace Mist{ udpSize = 7; if (targetParams.count("tracks")){tracks = targetParams["tracks"];} if (targetParams.count("pkts")){udpSize = atoi(targetParams["pkts"].c_str());} - packetBuffer.reserve(188 * udpSize); + packetBuffer.allocate(188 * udpSize); if (target.path.size()){ if (!pushSock.bind(0, target.path)){ disconnect(); @@ -230,7 +230,7 @@ namespace Mist{ if (wrapRTP){ // Send RTP packet itself if (rand() % 100 >= dropPercentage){ - tsOut.sendTS(&pushSock, packetBuffer.c_str(), packetBuffer.size()); + tsOut.sendTS(&pushSock, packetBuffer, packetBuffer.size()); myConn.addUp(tsOut.getHsize() + tsOut.getPayloadSize()); } else { INFO_MSG("Dropping RTP packet in order to simulate packet loss"); @@ -239,15 +239,14 @@ namespace Mist{ if (sendFEC){ // Send FEC packet if available uint64_t bytesSent = 0; - tsOut.parseFEC(&fecColumnSock, &fecRowSock, bytesSent, packetBuffer.c_str(), packetBuffer.size()); + tsOut.parseFEC(&fecColumnSock, &fecRowSock, bytesSent, packetBuffer, packetBuffer.size()); myConn.addUp(bytesSent); } }else{ - pushSock.SendNow(packetBuffer); + pushSock.SendNow(packetBuffer, packetBuffer.size()); myConn.addUp(packetBuffer.size()); } - packetBuffer.clear(); - packetBuffer.reserve(udpSize * len); + packetBuffer.truncate(0); curFilled = 0; } packetBuffer.append(tsData, len); diff --git a/src/output/output_ts.h b/src/output/output_ts.h index 9edb5c02..a32abb0b 100644 --- a/src/output/output_ts.h +++ b/src/output/output_ts.h @@ -21,7 +21,7 @@ namespace Mist{ bool wrapRTP; bool sendFEC; void onRTP(void *socket, const char *data, size_t nbytes); - std::string packetBuffer; + Util::ResizeablePointer packetBuffer; Socket::UDPConnection pushSock; Socket::UDPConnection fecColumnSock; Socket::UDPConnection fecRowSock; diff --git a/src/output/output_ts_base.cpp b/src/output/output_ts_base.cpp index 7b414229..91b8bfe4 100644 --- a/src/output/output_ts_base.cpp +++ b/src/output/output_ts_base.cpp @@ -8,6 +8,7 @@ namespace Mist{ setBlocking(true); sendRepeatingHeaders = 0; lastHeaderTime = 0; + maxSkipAhead = 0; } void TSOutput::fillPacket(char const *data, size_t dataLen, bool &firstPack, bool video, @@ -70,6 +71,8 @@ namespace Mist{ return; } } + if (liveSeek(true)){return;} + if (!M.trackLoaded(thisIdx)){return;} // Get ready some data to speed up accesses std::string type = M.getType(thisIdx); std::string codec = M.getCodec(thisIdx); diff --git a/src/output/output_tssrt.cpp b/src/output/output_tssrt.cpp index 8f35ff7d..d2abed7f 100644 --- a/src/output/output_tssrt.cpp +++ b/src/output/output_tssrt.cpp @@ -6,11 +6,13 @@ #include #include #include +#include bool allowStreamNameOverride = true; namespace Mist{ - OutTSSRT::OutTSSRT(Socket::Connection &conn, Socket::SRTConnection & _srtSock) : TSOutput(conn), srtConn(_srtSock){ + OutTSSRT::OutTSSRT(Socket::Connection &conn, Socket::SRTConnection * _srtSock) : TSOutput(conn){ + srtConn = _srtSock; // NOTE: conn is useless for SRT, as it uses a different socket type. sendRepeatingHeaders = 500; // PAT/PMT every 500ms (DVB spec) streamName = config->getString("streamname"); @@ -18,9 +20,14 @@ namespace Mist{ pushOut = false; bootMSOffsetCalculated = false; assembler.setLive(); + udpInit = 0; // Push output configuration if (config->getString("target").size()){ + Socket::SRT::libraryInit(); target = HTTP::URL(config->getString("target")); + HTTP::parseVars(target.args, targetParams); + std::string addData; + if (targetParams.count("streamid")){addData = targetParams["streamid"];} if (target.protocol != "srt"){ FAIL_MSG("Target %s must begin with srt://, aborting", target.getUrl().c_str()); onFail("Invalid srt target: doesn't start with srt://", true); @@ -32,28 +39,27 @@ namespace Mist{ return; } pushOut = true; - HTTP::parseVars(target.args, targetParams); size_t connectCnt = 0; do{ - srtConn.connect(target.host, target.getPort(), "output", targetParams); - if (!srtConn){ + if (!srtConn){srtConn = new Socket::SRTConnection();} + if (srtConn){srtConn->connect(target.host, target.getPort(), "output", targetParams);} + if (!*srtConn){ Util::sleep(1000); }else{ - INFO_MSG("Connect success on attempt %zu", connectCnt+1); + INFO_MSG("SRT socket %s on attempt %zu", srtConn->getStateStr(), connectCnt+1); break; } ++connectCnt; - }while (!srtConn && connectCnt < 5); + }while ((!srtConn || !*srtConn) && connectCnt < 5); if (!srtConn){ FAIL_MSG("Failed to connect to '%s'!", config->getString("target").c_str()); } wantRequest = false; parseData = true; - initialize(); }else{ // Pull output configuration, In this case we have an srt connection in the second constructor parameter. // Handle override / append of streamname options - std::string sName = srtConn.getStreamName(); + std::string sName = srtConn->getStreamName(); if (allowStreamNameOverride){ if (sName != ""){ streamName = sName; @@ -61,18 +67,19 @@ namespace Mist{ Util::setStreamName(streamName); } } + myConn.setHost(srtConn->remotehost); int64_t accTypes = config->getInteger("acceptable"); if (accTypes == 0){//Allow both directions - srtConn.setBlocking(false); + srtConn->setBlocking(false); //Try to read the socket 10 times. If any reads succeed, assume they are pushing in size_t retries = 60; - while (!accTypes && srtConn && retries){ - size_t recvSize = srtConn.Recv(); + while (!accTypes && *srtConn && retries){ + size_t recvSize = srtConn->Recv(); if (recvSize){ accTypes = 2; INFO_MSG("Connection put into ingest mode"); - assembler.assemble(tsIn, srtConn.recvbuf, recvSize, true); + assembler.assemble(tsIn, srtConn->recvbuf, recvSize, true); }else{ Util::sleep(50); } @@ -85,14 +92,14 @@ namespace Mist{ } } if (accTypes == 1){// Only allow outgoing - srtConn.setBlocking(true); - srtConn.direction = "output"; + srtConn->setBlocking(true); + srtConn->direction = "output"; parseData = true; wantRequest = false; initialize(); }else if (accTypes == 2){//Only allow incoming - srtConn.setBlocking(false); - srtConn.direction = "input"; + srtConn->setBlocking(false); + srtConn->direction = "input"; if (Triggers::shouldTrigger("PUSH_REWRITE")){ HTTP::URL reqUrl; reqUrl.protocol = "srt"; @@ -115,7 +122,11 @@ namespace Mist{ Util::sanitizeName(streamName); } } - myConn.setHost(srtConn.remotehost); + if (!streamName.size()){ + Util::logExitReason(ER_FORMAT_SPECIFIC, "Push from %s rejected - there is no stream name set", getConnectedHost().c_str()); + onFinish(); + return; + } if (!allowPush("")){ onFinish(); return; @@ -128,17 +139,48 @@ namespace Mist{ } } + lastWorked = Util::bootSecs(); lastTimeStamp = 0; timeStampOffset = 0; } bool OutTSSRT::onFinish(){ - myConn.close(); - srtConn.close(); + config->is_active = false; return false; } - OutTSSRT::~OutTSSRT(){} + OutTSSRT::~OutTSSRT(){ + if(srtConn){ + srtConn->close(); + delete srtConn; + srtConn = 0; + } + Socket::SRT::libraryCleanup(); + } + + // Override initialSeek to go to last possible position for live streams + void OutTSSRT::initialSeek(bool dryRun){ + if (!meta){return;} + meta.removeLimiter(); + + uint64_t seekPos = 0; + + std::set validTracks = M.getValidTracks(); + if (M.getLive() && validTracks.size()){ + if (userSelect.size()){ + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ + if (M.trackValid(it->first) && (M.getNowms(it->first) < seekPos || !seekPos)){ + seekPos = meta.getNowms(it->first); + } + } + }else{ + for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); it++){ + if (meta.getNowms(*it) < seekPos || !seekPos){seekPos = meta.getNowms(*it);} + } + } + } + seek(seekPos); + } static void addIntOpt(JSON::Value & pp, const std::string & param, const std::string & name, const std::string & help, size_t def = 0){ pp[param]["name"] = name; @@ -190,14 +232,6 @@ namespace Mist{ capa["optional"]["streamname"]["short"] = "s"; capa["optional"]["streamname"]["default"] = ""; - capa["optional"]["filelimit"]["name"] = "Open file descriptor limit"; - capa["optional"]["filelimit"]["help"] = "Increase open file descriptor to this value if current system value is lower. A higher value may be needed for handling many concurrent SRT connections."; - - capa["optional"]["filelimit"]["type"] = "int"; - capa["optional"]["filelimit"]["option"] = "--filelimit"; - capa["optional"]["filelimit"]["short"] = "l"; - capa["optional"]["filelimit"]["default"] = "1024"; - capa["optional"]["acceptable"]["name"] = "Acceptable connection types"; capa["optional"]["acceptable"]["help"] = "Whether to allow only incoming pushes (2), only outgoing pulls (1), or both (0, default)"; @@ -313,32 +347,81 @@ namespace Mist{ opt["default"] = ""; opt["help"] = "Which parser to use for data tracks"; config->addOption("datatrack", opt); + + capa["optional"]["passphrase"]["name"] = "Passphrase"; + capa["optional"]["passphrase"]["help"] = "If set, requires a SRT passphrase to connect"; + capa["optional"]["passphrase"]["type"] = "string"; + capa["optional"]["passphrase"]["option"] = "--passphrase"; + capa["optional"]["passphrase"]["short"] = "P"; + capa["optional"]["passphrase"]["default"] = ""; + + opt.null(); + opt["long"] = "passphrase"; + opt["short"] = "P"; + opt["arg"] = "string"; + opt["default"] = ""; + opt["help"] = "If set, requires a SRT passphrase to connect"; + config->addOption("passphrase", opt); + + capa["optional"]["sockopts"]["name"] = "SRT socket options"; + capa["optional"]["sockopts"]["help"] = "Any additional SRT socket options to apply"; + capa["optional"]["sockopts"]["type"] = "string"; + capa["optional"]["sockopts"]["option"] = "--sockopts"; + capa["optional"]["sockopts"]["short"] = "O"; + capa["optional"]["sockopts"]["default"] = ""; + + opt.null(); + opt["long"] = "sockopts"; + opt["short"] = "O"; + opt["arg"] = "string"; + opt["default"] = ""; + opt["help"] = "Any additional SRT socket options to apply"; + config->addOption("sockopts", opt); + + } // Buffers TS packets and sends after 7 are buffered. void OutTSSRT::sendTS(const char *tsData, size_t len){ packetBuffer.append(tsData, len); if (packetBuffer.size() >= 1316){//7 whole TS packets - if (!srtConn){ + if (!*srtConn){ if (config->getString("target").size()){ + if (lastWorked + 5 < Util::bootSecs()){ + Util::logExitReason(ER_CLEAN_REMOTE_CLOSE, "SRT connection closed, no reconnect success after 5s"); + config->is_active = false; + parseData = false; + return; + } INFO_MSG("Reconnecting..."); - srtConn.connect(target.host, target.getPort(), "output", targetParams); - if (!srtConn){Util::sleep(500);} + if (srtConn){ + srtConn->close(); + delete srtConn; + } + if (udpInit){ + srtConn = new Socket::SRTConnection(*udpInit, "rendezvous", targetParams); + }else{ + srtConn = new Socket::SRTConnection(); + } + srtConn->connect(target.host, target.getPort(), "output", targetParams); + if (!*srtConn){Util::sleep(500);} }else{ - Util::logExitReason(ER_CLEAN_REMOTE_CLOSE, "SRT connection closed"); - myConn.close(); + Util::logExitReason(ER_CLEAN_REMOTE_CLOSE, "SRT connection closed (mid-send)"); + config->is_active = false; parseData = false; return; } } - if (srtConn){ - srtConn.SendNow(packetBuffer, packetBuffer.size()); - if (!srtConn){ + if (*srtConn){ + srtConn->SendNow(packetBuffer, packetBuffer.size()); + if (!*srtConn){ if (!config->getString("target").size()){ - Util::logExitReason(ER_CLEAN_REMOTE_CLOSE, "SRT connection closed"); - myConn.close(); + Util::logExitReason(ER_CLEAN_REMOTE_CLOSE, "SRT connection closed (post-send)"); + config->is_active = false; parseData = false; } + }else{ + lastWorked = Util::bootSecs(); } } packetBuffer.assign(0,0); @@ -346,11 +429,12 @@ namespace Mist{ } void OutTSSRT::requestHandler(){ - size_t recvSize = srtConn.Recv(); + size_t recvSize = srtConn->Recv(); if (!recvSize){ - if (!srtConn){ - myConn.close(); - srtConn.close(); + if (!*srtConn){ + Util::logExitReason(ER_CLEAN_REMOTE_CLOSE, "SRT connection %s (in request handler)", srtConn->getStateStr()); + config->is_active = false; + srtConn->close(); wantRequest = false; }else{ Util::sleep(50); @@ -358,13 +442,13 @@ namespace Mist{ return; } lastRecv = Util::bootSecs(); - if (!assembler.assemble(tsIn, srtConn.recvbuf, recvSize, true)){return;} + if (!assembler.assemble(tsIn, srtConn->recvbuf, recvSize, true)){return;} while (tsIn.hasPacket()){ tsIn.getEarliestPacket(thisPacket); if (!thisPacket){ - INFO_MSG("Could not get TS packet"); - myConn.close(); - srtConn.close(); + Util::logExitReason(ER_FORMAT_SPECIFIC, "Could not get TS packet"); + config->is_active = false; + srtConn->close(); wantRequest = false; return; } @@ -380,7 +464,7 @@ namespace Mist{ size_t thisIdx = M.trackIDToIndex(thisPacket.getTrackId(), getpid()); if (thisIdx == INVALID_TRACK_ID){return;} if (!userSelect.count(thisIdx)){ - userSelect[thisIdx].reload(streamName, thisIdx, COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); + userSelect[thisIdx].reload(streamName, thisIdx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); } uint64_t pktTimeWithOffset = thisPacket.getTime() + timeStampOffset; @@ -406,164 +490,126 @@ namespace Mist{ bool OutTSSRT::dropPushTrack(uint32_t trackId, const std::string & dropReason){ Util::logExitReason(ER_SHM_LOST, "track dropped by buffer"); - myConn.close(); - srtConn.close(); + config->is_active = false; + if (srtConn){srtConn->close();} return Output::dropPushTrack(trackId, dropReason); } void OutTSSRT::connStats(uint64_t now, Comms::Connections &statComm){ - if (!srtConn){return;} - statComm.setUp(srtConn.dataUp()); - statComm.setDown(srtConn.dataDown()); - statComm.setTime(now - srtConn.connTime()); - statComm.setPacketCount(srtConn.packetCount()); - statComm.setPacketLostCount(srtConn.packetLostCount()); - statComm.setPacketRetransmitCount(srtConn.packetRetransmitCount()); + if (!srtConn || !*srtConn){return;} + statComm.setUp(srtConn->dataUp()); + statComm.setDown(srtConn->dataDown()); + statComm.setTime(now - srtConn->connTime()); + statComm.setPacketCount(srtConn->packetCount()); + statComm.setPacketLostCount(srtConn->packetLostCount()); + statComm.setPacketRetransmitCount(srtConn->packetRetransmitCount()); } -}// namespace Mist - - -Socket::SRTServer server_socket; -static uint64_t sockCount = 0; - -void (*oldSignal)(int, siginfo_t *,void *) = 0; -void signal_handler(int signum, siginfo_t *sigInfo, void *ignore){ - server_socket.close(); - if (oldSignal){ - oldSignal(signum, sigInfo, ignore); + bool OutTSSRT::listenMode(){ + std::string tgt = config->getString("target"); + return (!tgt.size() || (tgt.size() >= 6 && tgt.substr(0, 6) == "srt://" && Socket::interpretSRTMode(HTTP::URL(tgt)) == "listener")); } -} -void handleUSR1(int signum, siginfo_t *sigInfo, void *ignore){ - if (!sockCount){ - INFO_MSG("USR1 received - triggering rolling restart (no connections active)"); - Util::Config::is_restarting = true; - Util::logExitReason(ER_CLEAN_SIGNAL, "signal USR1, no connections"); - server_socket.close(); - Util::Config::is_active = false; - }else{ - INFO_MSG("USR1 received - triggering rolling restart when connection count reaches zero"); - Util::Config::is_restarting = true; - Util::logExitReason(ER_CLEAN_SIGNAL, "signal USR1, after disconnect wait"); - } -} - -// Callback for SRT-serving threads -static void callThreadCallbackSRT(void *srtPtr){ - sockCount++; - Socket::SRTConnection & srtSock = *(Socket::SRTConnection*)srtPtr; - int fds[2]; - pipe(fds); - Socket::Connection Sconn(fds[0], fds[1]); - HIGH_MSG("Started thread for socket %i", srtSock.getSocket()); - mistOut tmp(Sconn,srtSock); - tmp.run(); - HIGH_MSG("Closing thread for socket %i", srtSock.getSocket()); - Sconn.close(); - srtSock.close(); - delete &srtSock; - sockCount--; - if (!sockCount && Util::Config::is_restarting){ - server_socket.close(); - Util::Config::is_active = false; - INFO_MSG("Last active connection closed; triggering rolling restart now!"); - } -} - -int main(int argc, char *argv[]){ - Socket::SRT::libraryInit(); - DTSC::trackValidMask = TRACK_VALID_EXT_HUMAN; - Util::redirectLogsIfNeeded(); - Util::Config conf(argv[0]); - Util::Config::binaryType = Util::OUTPUT; - mistOut::init(&conf); - if (conf.parseArgs(argc, argv)){ - if (conf.getBool("json")){ - mistOut::capa["version"] = PACKAGE_VERSION; - std::cout << mistOut::capa.toString() << std::endl; - return -1; + void initSRTConnection(Socket::UDPConnection & s, std::map & arguments, bool listener = true){ + Socket::SRT::libraryInit(); + Socket::SRTConnection * tmpSock = new Socket::SRTConnection(s, listener?"output":"rendezvous", arguments); + if (!*tmpSock){ + delete tmpSock; + return; } - conf.activate(); - - int filelimit = conf.getInteger("filelimit"); - Util::sysSetNrOpenFiles(filelimit); + if (!listener){ + std::string host; + uint32_t port; + s.GetDestination(host, port); + tmpSock->connect(host, port, "output", arguments); + INFO_MSG("UDP to SRT socket conversion: %s", tmpSock->getStateStr()); + } + Socket::Connection S(1, 0); + mistOut tmp(S, tmpSock); + tmp.run(); + } + void OutTSSRT::listener(Util::Config &conf, int (*callback)(Socket::Connection &S)){ + // Check SRT options/arguments first std::string target = conf.getString("target"); - if (!mistOut::listenMode() && (!target.size() || Socket::interpretSRTMode(HTTP::URL(target)) != "listener")){ - Socket::Connection S(fileno(stdout), fileno(stdin)); - Socket::SRTConnection tmpSock; - mistOut tmp(S, tmpSock); - return tmp.run(); - } - { - struct sigaction new_action; - new_action.sa_sigaction = handleUSR1; - sigemptyset(&new_action.sa_mask); - new_action.sa_flags = 0; - sigaction(SIGUSR1, &new_action, NULL); - } + std::map arguments; if (target.size()){ //Force acceptable option to 1 (outgoing only), since this is a push output and we can't accept incoming connections conf.getOption("acceptable", true).append((uint64_t)1); //Disable overriding streamname with streamid parameter on other side allowStreamNameOverride = false; HTTP::URL tgt(target); - std::map arguments; HTTP::parseVars(tgt.args, arguments); - server_socket = Socket::SRTServer(tgt.getPort(), tgt.host, arguments, false, "output"); + conf.getOption("interface", true).append(tgt.host); + conf.getOption("port", true).append((uint64_t)tgt.getPort()); conf.getOption("target", true).append(""); }else{ - std::map arguments; - server_socket = Socket::SRTServer(conf.getInteger("port"), conf.getString("interface"), arguments, false, "output"); + HTTP::parseVars(conf.getString("sockopts"), arguments); + std::string opt = conf.getString("passphrase"); + if (opt.size()){arguments["passphrase"] = opt;} } - if (!server_socket.connected()){ - DEVEL_MSG("Failure to open socket"); - return 1; + + uint16_t localPort; + // Either re-use socket 0 or bind a new socket + Socket::UDPConnection udpSrv; + if (Socket::checkTrueSocket(0)){ + udpSrv.assimilate(0); + localPort = udpSrv.getBoundPort(); + }else{ + localPort = udpSrv.bind(conf.getInteger("port"), conf.getString("interface")); } - struct sigaction new_action; - struct sigaction cur_action; - new_action.sa_sigaction = signal_handler; - sigemptyset(&new_action.sa_mask); - new_action.sa_flags = SA_SIGINFO; - sigaction(SIGINT, &new_action, &cur_action); - if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ - if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} - oldSignal = cur_action.sa_sigaction; - } - sigaction(SIGHUP, &new_action, &cur_action); - if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ - if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} - oldSignal = cur_action.sa_sigaction; - } - sigaction(SIGTERM, &new_action, &cur_action); - if (cur_action.sa_sigaction && cur_action.sa_sigaction != oldSignal){ - if (oldSignal){WARN_MSG("Multiple signal handlers! I can't deal with this.");} - oldSignal = cur_action.sa_sigaction; - } - Comms::defaultCommFlags = COMM_STATUS_NOKILL; - Util::Procs::socketList.insert(server_socket.getSocket()); - while (conf.is_active && server_socket.connected()){ - Socket::SRTConnection S = server_socket.accept(false, "output"); - if (S.connected()){// check if the new connection is valid - // spawn a new thread for this connection - tthread::thread T(callThreadCallbackSRT, (void *)new Socket::SRTConnection(S)); - // detach it, no need to keep track of it anymore - T.detach(); - }else{ - Util::sleep(10); // sleep 10ms + // Ensure socket zero is now us + if (udpSrv.getSock()){ + int oldSock = udpSrv.getSock(); + if (!dup2(oldSock, 0)){ + udpSrv.assimilate(0); } } - Util::Procs::socketList.erase(server_socket.getSocket()); - server_socket.close(); - if (conf.is_restarting){ - INFO_MSG("Reloading input..."); - execvp(argv[0], argv); - FAIL_MSG("Error reloading: %s", strerror(errno)); + if (!udpSrv){ + Util::logExitReason(ER_READ_START_FAILURE, "Failure to open listening socket"); + conf.is_active = false; + return; + } + Util::Config::setServerFD(0); + udpSrv.allocateDestination(); + + Util::Procs::socketList.insert(udpSrv.getSock()); + int maxFD = udpSrv.getSock(); + while (conf.is_active && udpSrv){ + + fd_set rfds; + FD_ZERO(&rfds); + FD_SET(maxFD, &rfds); + + struct timeval T; + T.tv_sec = 2; + T.tv_usec = 0; + int r = select(maxFD + 1, &rfds, NULL, NULL, &T); + if (r){ + while(udpSrv.Receive()){ + // Ignore if it's not an SRT handshake packet + if (udpSrv.data.size() >= 4 && udpSrv.data[0] == 0x80 && !udpSrv.data[1] && !udpSrv.data[2] && !udpSrv.data[3]){ + bool rendezvous = false; + if (udpSrv.data.size() >= 40){ + rendezvous = (!udpSrv.data[36] && !udpSrv.data[37] && !udpSrv.data[38] && !udpSrv.data[39]); + } + std::string remoteIP, localIP; + uint32_t remotePort, localPort; + udpSrv.GetDestination(remoteIP, remotePort); + udpSrv.GetLocalDestination(localIP, localPort); + INFO_MSG("SRT handshake from %s:%" PRIu32 "! Spawning child process to handle it...", remoteIP.c_str(), remotePort); + if (!fork()){ + Socket::UDPConnection s(udpSrv); + udpSrv.close(); + if (!s.connect()){return;} + return initSRTConnection(s, arguments, !rendezvous); + } + } + } + } } } - INFO_MSG("Exit reason: %s", Util::exitReason); - Socket::SRT::libraryCleanup(); - return 0; -} + +}// namespace Mist + diff --git a/src/output/output_tssrt.h b/src/output/output_tssrt.h index 9804c3e9..ce824fca 100644 --- a/src/output/output_tssrt.h +++ b/src/output/output_tssrt.h @@ -5,25 +5,29 @@ namespace Mist{ class OutTSSRT : public TSOutput{ public: - OutTSSRT(Socket::Connection &conn, Socket::SRTConnection & _srtSock); + OutTSSRT(Socket::Connection &conn, Socket::SRTConnection * _srtSock = 0); ~OutTSSRT(); - static bool listenMode(){return !(config->getString("target").size());} + static bool listenMode(); + static void listener(Util::Config &conf, int (*callback)(Socket::Connection &S)); static void init(Util::Config *cfg); void sendTS(const char *tsData, size_t len = 188); bool isReadyForPlay(){return true;} virtual void requestHandler(); virtual bool onFinish(); + virtual void initialSeek(bool dryRun = false); + inline virtual bool keepGoing(){return config->is_active;} protected: virtual void connStats(uint64_t now, Comms::Connections &statComm); - virtual std::string getConnectedHost(){return srtConn.remotehost;} - virtual std::string getConnectedBinHost(){return srtConn.getBinHost();} + virtual std::string getConnectedHost(){return srtConn?srtConn->remotehost:"";} + virtual std::string getConnectedBinHost(){return srtConn?srtConn->getBinHost():"";} virtual bool dropPushTrack(uint32_t trackId, const std::string & dropReason); private: HTTP::URL target; int64_t timeStampOffset; uint64_t lastTimeStamp; + uint64_t lastWorked; bool pushOut; Util::ResizeablePointer packetBuffer; Socket::UDPConnection pushSock; @@ -31,7 +35,8 @@ namespace Mist{ TS::Assembler assembler; bool bootMSOffsetCalculated; - Socket::SRTConnection & srtConn; + Socket::SRTConnection * srtConn; + Socket::UDPConnection * udpInit; }; }// namespace Mist