From 2485c16dfcd81638d0137b872ab9357341062400 Mon Sep 17 00:00:00 2001 From: Marco van Dijk Date: Tue, 26 Apr 2022 15:40:58 +0200 Subject: [PATCH] FEC --- lib/rtp.cpp | 219 +++++++++++++++++++++++++++++++++++++++ lib/rtp.h | 37 +++++++ src/output/output_ts.cpp | 68 ++++++++++-- src/output/output_ts.h | 9 +- 4 files changed, 325 insertions(+), 8 deletions(-) diff --git a/lib/rtp.cpp b/lib/rtp.cpp index b490050b..3ed9b7c6 100644 --- a/lib/rtp.cpp +++ b/lib/rtp.cpp @@ -55,6 +55,222 @@ namespace RTP{ void Packet::increaseSequence(){setSequence(getSequence() + 1);} + /// \brief Enables Pro-MPEG FEC with the specified amount of rows and columns + bool Packet::configureFEC(uint8_t rows, uint8_t columns){ + if (rows < 4 || rows > 20){ + ERROR_MSG("Rows should have a value between 4-20"); + return false; + } else if (columns < 1 || columns > 20){ + ERROR_MSG("Columns should have a value between 1-20"); + return false; + } else if (rows * columns > 100){ + ERROR_MSG("The product of rows * columns cannot exceed 100"); + return false; + } + fecEnabled = true; + fecContext.needsInit = true; + fecContext.rows = rows; + fecContext.columns = columns; + fecContext.maxIndex = rows * columns; + INFO_MSG("Enabling 2d-fec with %u rows and %u columns", rows, columns); + return true; + } + + void Packet::initFEC(uint64_t bufSize){ + fecContext.needsInit = false; + fecContext.isFirst = true; + fecContext.index = 0; + fecContext.pktSize = bufSize; + fecContext.lengthRecovery = bufSize - 12; + // Add room for FEC and RTP header + fecContext.rtpBufSize = fecContext.lengthRecovery + 28; + // Add room for P, X, CC, M, PT, SN, TS fields + fecContext.bitstringSize = fecContext.lengthRecovery + 8; + fecContext.fecBufferRows.bitstring = 0; + fecContext.fecBufferColumns.clear(); + fecContext.columnSN = 0; + fecContext.rowSN = 0; + } + + /// \brief Takes an RTP packet containing TS packets and returns the modified payload + void Packet::generateBitstring(const char *payload, unsigned int payloadlen, uint8_t *bitstring){ + // Write 8 bits of header data (P, X, CC, M, PT, timestamp) + bitstring[0] = data[0] & 0x3f; + bitstring[1] = data[1]; + bitstring[2] = data[4]; + bitstring[3] = data[5]; + bitstring[4] = data[6]; + bitstring[5] = data[7]; + // Set length recovery + bitstring[7] = fecContext.lengthRecovery; + bitstring[6] = fecContext.lengthRecovery >> 8; + // Append payload of RTP packet + memcpy(bitstring + 8, payload, fecContext.lengthRecovery); + } + + void Packet::applyXOR(const uint8_t *in1, const uint8_t *in2, uint8_t *out, uint64_t size){ + uint64_t index = 0; + for (index = 0; index < size; index++) { + out[index] = in1[index] ^ in2[index]; + } + } + + /// \brief Sends buffered FEC packets + /// \param socket UDP socket ready to send packets + /// \param buf bitstring we want to contain in a FEC packet + /// \param isColumn whether the buf we want to send represents a completed column or row + void Packet::sendFec(void *socket, FecData *fecData, bool isColumn){ + uint8_t *data = fecData->bitstring; + // Create zero filled buffer + uint8_t *rtpBuf = (uint8_t *)malloc(fecContext.rtpBufSize); + memset(rtpBuf, 0, fecContext.rtpBufSize); + uint16_t thisSN = isColumn ? ++fecContext.columnSN : ++fecContext.rowSN; + + // V, P, X, CC + rtpBuf[0] = 0x80 | (data[0] & 0x3f); + // M, PT + rtpBuf[1] = (data[1] & 0x80) | 0x60; + // SN + rtpBuf[3] = thisSN; + rtpBuf[2] = thisSN >> 8; + // TS + rtpBuf[7] = fecData->timestamp; + rtpBuf[6] = fecData->timestamp >> 8; + rtpBuf[5] = fecData->timestamp >> 16; + rtpBuf[4] = fecData->timestamp >> 24; + // Keep SSRC 0 and skip CSRC + + // SNBase low (lowest sequence number of the sequence of RTP packets in this FEC packet) + rtpBuf[13] = fecData->sequence; + rtpBuf[12] = fecData->sequence >> 8; + // Length recovery + rtpBuf[14] = data[6]; + rtpBuf[15] = data[7]; + // E=1, PT recovery + rtpBuf[16] = 0x80 | data[1]; + // Keep Mask 0 + // TS recovery + rtpBuf[20] = data[2]; + rtpBuf[21] = data[3]; + rtpBuf[22] = data[4]; + rtpBuf[23] = data[5]; + // X=0, D, type=0, index=0 + rtpBuf[24] = isColumn ? 0x0 : 0x40; + // offset (number of columns) + rtpBuf[25] = isColumn ? fecContext.columns : 0x1; + // NA (number of rows) + rtpBuf[26] = isColumn ? fecContext.rows : fecContext.columns; + // Keep SNBase ext bits 0 + // Payload + memcpy(rtpBuf + 28, data + 8, fecContext.lengthRecovery); + + ((Socket::UDPConnection *)socket)->SendNow(reinterpret_cast(rtpBuf), fecContext.rtpBufSize); + sentPackets++; + sentBytes += fecContext.rtpBufSize; + free(rtpBuf); + } + + /// \brief Parses new RTP packets + void Packet::parseFEC(void *columnSocket, void *rowSocket, uint64_t & bytesSent, const char *payload, unsigned int payloadlen){ + if (!fecEnabled){ + return; + } + uint8_t *bitstring; + uint8_t thisColumn; + uint8_t thisRow; + // Check to see if we need to reinit FEC data + if (fecContext.needsInit){ + // Add space for the RTP header + initFEC(payloadlen + 12); + } + // Check the buffer size which should be constant + if (payloadlen != fecContext.lengthRecovery){ + WARN_MSG("RTP packet size should be constant, expected %u but got %u", fecContext.lengthRecovery, payloadlen); + return; + } + // Create bitstring + bitstring = (uint8_t *)malloc(fecContext.pktSize); + generateBitstring(payload, payloadlen, bitstring); + + thisColumn = fecContext.index % fecContext.columns; + thisRow = (fecContext.index / fecContext.columns) % fecContext.rows; + // Check for completed rows of data + if (thisColumn == 0){ + // Double check if we have a final FEC row of data before sending it + if (!fecContext.isFirst || fecContext.index > 0){ + if (thisRow == 0){ + INSANE_MSG("Sending completed FEC packet at row %u", fecContext.rows - 1); + } else { + INSANE_MSG("Sending completed FEC packet at row %u", thisRow - 1); + } + sendFec(rowSocket, &fecContext.fecBufferRows, false); + bytesSent += fecContext.rtpBufSize; + } + free(fecContext.fecBufferRows.bitstring); + fecContext.fecBufferRows.bitstring = bitstring; + // Set the SN and TS of this first packet in the sequence + fecContext.fecBufferRows.sequence = getSequence() - 1; + fecContext.fecBufferRows.timestamp = getTimeStamp(); + } else { + // This is an intermediate packet, apply XOR operation and continue + applyXOR(fecContext.fecBufferRows.bitstring, bitstring, fecContext.fecBufferRows.bitstring, fecContext.bitstringSize); + } + // XOR or set new bitstring + if (thisRow == 0){ + // Make a copy if we are already using this bitstring for the FEC row + if (thisColumn == 0){ + uint8_t *bitstringCopy; + bitstringCopy = (uint8_t *)malloc(fecContext.pktSize); + memcpy(bitstringCopy, bitstring, fecContext.pktSize); + fecContext.fecBufferColumns[thisColumn].bitstring = bitstringCopy; + } else { + fecContext.fecBufferColumns[thisColumn].bitstring = bitstring; + } + fecContext.fecBufferColumns[thisColumn].sequence = getSequence() - 1; + fecContext.fecBufferColumns[thisColumn].timestamp = getTimeStamp(); + } else { + // This is an intermediate packet, apply XOR operation and continue + applyXOR(fecContext.fecBufferColumns[thisColumn].bitstring, bitstring, fecContext.fecBufferColumns[thisColumn].bitstring, fecContext.bitstringSize); + } + + // Check for completed columns of data + if (thisRow == fecContext.rows - 1){ + INSANE_MSG("Sending completed FEC packet at column %u", thisColumn); + sendFec(columnSocket, &fecContext.fecBufferColumns[thisColumn], true); + bytesSent += fecContext.rtpBufSize; + free(fecContext.fecBufferColumns[thisColumn].bitstring); + } + + // Update variables + fecContext.index++; + if (fecContext.index >= fecContext.maxIndex){ + fecContext.isFirst = false; + fecContext.index = 0; + } + } + + void Packet::sendNoPacket(unsigned int payloadlen){ + // Increment counters + sentPackets++; + sentBytes += payloadlen + getHsize(); + setTimestamp(Util::bootMS()); + increaseSequence(); + } + + void Packet::sendTS(void *socket, const char *payload, unsigned int payloadlen){ + // Add TS payload + memcpy(data + getHsize(), payload, payloadlen); + INSANE_MSG("Sending RTP packet with header size %u and payload size %u", getHsize(), payloadlen); + // Set timestamp to current time + setTimestamp(Util::bootMS()*90); + // Send RTP packet itself + ((Socket::UDPConnection *)socket)->SendNow(data, getHsize() + payloadlen); + // Increment counters + sentPackets++; + sentBytes += payloadlen + getHsize(); + increaseSequence(); + } + void Packet::sendH264(void *socket, void callBack(void *, const char *, size_t, uint8_t), const char *payload, uint32_t payloadlen, uint32_t channel, bool lastOfAccesUnit){ if ((payload[0] & 0x1F) == 12){return;} @@ -347,6 +563,7 @@ namespace RTP{ maxDataLen = 0; sentBytes = 0; sentPackets = 0; + fecEnabled = false; } Packet::Packet(uint32_t payloadType, uint32_t sequence, uint64_t timestamp, uint32_t ssrc, uint32_t csrcCount){ @@ -365,6 +582,7 @@ namespace RTP{ setSSRC(ssrc); sentBytes = 0; sentPackets = 0; + fecEnabled = false; } Packet::Packet(const Packet &o){ @@ -418,6 +636,7 @@ namespace RTP{ maxDataLen = len; sentBytes = 0; sentPackets = 0; + fecEnabled = false; data = (char *)dat; } diff --git a/lib/rtp.h b/lib/rtp.h index dc782936..3d5bbb53 100644 --- a/lib/rtp.h +++ b/lib/rtp.h @@ -29,6 +29,33 @@ namespace RTP{ extern unsigned int PACKET_REORDER_WAIT; extern unsigned int PACKET_DROP_TIMEOUT; + struct FecData{ + public: + uint16_t sequence; + uint32_t timestamp; + uint8_t *bitstring; + }; + + struct FEC{ + public: + bool needsInit; + bool isFirst; + uint16_t maxIndex; + // Track the amount of row/column FEC packets were sent, as they have their own index + uint16_t columnSN; + uint16_t rowSN; + // Determines what row/column of FEC data we are currently on + uint8_t rows; + uint8_t columns; + uint16_t index; + uint16_t lengthRecovery; + uint32_t pktSize; + uint32_t rtpBufSize; + uint32_t bitstringSize; + FecData fecBufferRows; // Stores intermediate results or XOR'd RTP packets + std::map fecBufferColumns; + }; + /// This class is used to make RTP packets. Currently, H264, and AAC are supported. RTP /// mechanisms, like increasing sequence numbers and setting timestamps are all taken care of in /// here. @@ -39,6 +66,8 @@ namespace RTP{ uint32_t maxDataLen; ///< Amount of reserved bytes for the packet(s) uint32_t sentPackets; uint32_t sentBytes; // Because ugly is beautiful + bool fecEnabled; + FEC fecContext; public: static double startRTCP; uint32_t getHsize() const; @@ -58,6 +87,14 @@ namespace RTP{ void setTimestamp(uint32_t t); void increaseSequence(); + void initFEC(uint64_t bufSize); + void applyXOR(const uint8_t *in1, const uint8_t *in2, uint8_t *out, uint64_t size); + void generateBitstring(const char *payload, unsigned int payloadlen, uint8_t *bitstring); + bool configureFEC(uint8_t rows, uint8_t columns); + void sendFec(void *socket, FecData *fecData, bool isColumn); + void parseFEC(void *columnSocket, void *rowSocket, uint64_t & bytesSent, const char *payload, unsigned int payloadlen); + void sendNoPacket(unsigned int payloadlen); + void sendTS(void *socket, const char *payload, unsigned int payloadlen); void sendH264(void *socket, void callBack(void *, const char *, size_t, uint8_t), const char *payload, unsigned int payloadlen, unsigned int channel, bool lastOfAccessUnit); void sendVP8(void *socket, void callBack(void *, const char *, size_t, uint8_t), diff --git a/src/output/output_ts.cpp b/src/output/output_ts.cpp index bb143b18..e7b01005 100644 --- a/src/output/output_ts.cpp +++ b/src/output/output_ts.cpp @@ -10,12 +10,15 @@ namespace Mist{ sendRepeatingHeaders = 500; // PAT/PMT every 500ms (DVB spec) streamName = config->getString("streamname"); pushOut = false; + sendFEC = false; + wrapRTP = false; + dropPercentage = 0; std::string tracks = config->getString("tracks"); if (config->getString("target").size()){ HTTP::URL target(config->getString("target")); - if (target.protocol != "tsudp"){ - FAIL_MSG("Target %s must begin with tsudp://, aborting", target.getUrl().c_str()); - onFail("Invalid ts udp target: doesn't start with tsudp://", true); + if (target.protocol != "tsudp" && target.protocol != "tsrtp"){ + FAIL_MSG("Target %s must begin with tsudp:// or tsrtp://, aborting", target.getUrl().c_str()); + onFail("Invalid ts udp target: doesn't start with tsudp:// or tsrtp://", true); return; } if (!target.getPort()){ @@ -23,8 +26,40 @@ namespace Mist{ onFail("Invalid ts udp target: missing port", true); return; } + // Wrap TS packets inside an RTP packet + if (target.protocol == "tsrtp"){ + // MP2T payload, no CSRC list and init to sequence number 1, random SSRC and random timestamp + tsOut = RTP::Packet(33, 1, rand(), rand()); + wrapRTP = true; + } + if (wrapRTP && targetParams.count("fec")){ + if (targetParams.at("fec") == "prompeg"){ + uint8_t rows = 8; + uint8_t columns = 4; + if (targetParams.count("rows")){ + rows = atoi(targetParams.at("rows").c_str()); + } + if (targetParams.count("columns")){ + columns = atoi(targetParams.at("columns").c_str()); + } + if (tsOut.configureFEC(rows, columns)){ + // Send Pro-MPEG FEC columns over port number + 2 + fecColumnSock.SetDestination(target.host, target.getPort() + 2); + // Send Pro-MPEG FEC rows over port number + 4 + fecRowSock.SetDestination(target.host, target.getPort() + 4); + sendFEC = true; + } else { + WARN_MSG("Failed to configure FEC. Running without forward error correction"); + } + }else{ + WARN_MSG("Unsupported FEC of name '%s'. Running without forward error correction", targetParams.at("fec").c_str()); + } + } + if (targetParams.count("drop")){ + dropPercentage = atoi(targetParams.at("drop").c_str()); + } pushOut = true; - udpSize = 5; + udpSize = 7; if (targetParams.count("tracks")){tracks = targetParams["tracks"];} if (targetParams.count("pkts")){udpSize = atoi(targetParams["pkts"].c_str());} packetBuffer.reserve(188 * udpSize); @@ -131,12 +166,13 @@ namespace Mist{ cfg->addConnectorOptions(8888, capa); config = cfg; capa["push_urls"].append("tsudp://*"); + capa["push_urls"].append("tsrtp://*"); JSON::Value opt; opt["arg"] = "string"; opt["default"] = ""; opt["arg_num"] = 1; - opt["help"] = "Target tsudp:// URL to push out towards."; + opt["help"] = "Target tsudp:// or tsrtp:// URL to push out towards."; cfg->addOption("target", opt); } @@ -150,8 +186,26 @@ namespace Mist{ if (pushOut){ static size_t curFilled = 0; if (curFilled == udpSize){ - pushSock.SendNow(packetBuffer); - myConn.addUp(packetBuffer.size()); + // in MPEG-TS over RTP mode, wrap TS packets in a RTP header + if (wrapRTP){ + // Send RTP packet itself + if (rand() % 100 >= dropPercentage){ + tsOut.sendTS(&pushSock, packetBuffer.c_str(), packetBuffer.size()); + myConn.addUp(tsOut.getHsize() + tsOut.getPayloadSize()); + } else { + INFO_MSG("Dropping RTP packet in order to simulate packet loss"); + tsOut.sendNoPacket(packetBuffer.size()); + } + if (sendFEC){ + // Send FEC packet if available + uint64_t bytesSent = 0; + tsOut.parseFEC(&fecColumnSock, &fecRowSock, bytesSent, packetBuffer.c_str(), packetBuffer.size()); + myConn.addUp(bytesSent); + } + }else{ + pushSock.SendNow(packetBuffer); + myConn.addUp(packetBuffer.size()); + } packetBuffer.clear(); packetBuffer.reserve(udpSize * len); curFilled = 0; diff --git a/src/output/output_ts.h b/src/output/output_ts.h index 1b2457c5..32aa6958 100644 --- a/src/output/output_ts.h +++ b/src/output/output_ts.h @@ -1,6 +1,6 @@ #include "output_ts_base.h" #include - +#include namespace Mist{ class OutTS : public TSOutput{ public: @@ -16,10 +16,17 @@ namespace Mist{ private: size_t udpSize; bool pushOut; + bool wrapRTP; + bool sendFEC; + void onRTP(void *socket, const char *data, size_t nbytes); std::string packetBuffer; Socket::UDPConnection pushSock; + Socket::UDPConnection fecColumnSock; + Socket::UDPConnection fecRowSock; + uint8_t dropPercentage; TS::Stream tsIn; std::string getStatsName(); + RTP::Packet tsOut; protected: inline virtual bool keepGoing(){