This commit is contained in:
Marco van Dijk 2022-04-26 15:40:58 +02:00 committed by Thulinma
parent bd283fab1d
commit 2485c16dfc
4 changed files with 325 additions and 8 deletions

View file

@ -55,6 +55,222 @@ namespace RTP{
void Packet::increaseSequence(){setSequence(getSequence() + 1);}
/// \brief Enables Pro-MPEG FEC with the specified amount of rows and columns
bool Packet::configureFEC(uint8_t rows, uint8_t columns){
if (rows < 4 || rows > 20){
ERROR_MSG("Rows should have a value between 4-20");
return false;
} else if (columns < 1 || columns > 20){
ERROR_MSG("Columns should have a value between 1-20");
return false;
} else if (rows * columns > 100){
ERROR_MSG("The product of rows * columns cannot exceed 100");
return false;
}
fecEnabled = true;
fecContext.needsInit = true;
fecContext.rows = rows;
fecContext.columns = columns;
fecContext.maxIndex = rows * columns;
INFO_MSG("Enabling 2d-fec with %u rows and %u columns", rows, columns);
return true;
}
void Packet::initFEC(uint64_t bufSize){
fecContext.needsInit = false;
fecContext.isFirst = true;
fecContext.index = 0;
fecContext.pktSize = bufSize;
fecContext.lengthRecovery = bufSize - 12;
// Add room for FEC and RTP header
fecContext.rtpBufSize = fecContext.lengthRecovery + 28;
// Add room for P, X, CC, M, PT, SN, TS fields
fecContext.bitstringSize = fecContext.lengthRecovery + 8;
fecContext.fecBufferRows.bitstring = 0;
fecContext.fecBufferColumns.clear();
fecContext.columnSN = 0;
fecContext.rowSN = 0;
}
/// \brief Takes an RTP packet containing TS packets and returns the modified payload
void Packet::generateBitstring(const char *payload, unsigned int payloadlen, uint8_t *bitstring){
// Write 8 bits of header data (P, X, CC, M, PT, timestamp)
bitstring[0] = data[0] & 0x3f;
bitstring[1] = data[1];
bitstring[2] = data[4];
bitstring[3] = data[5];
bitstring[4] = data[6];
bitstring[5] = data[7];
// Set length recovery
bitstring[7] = fecContext.lengthRecovery;
bitstring[6] = fecContext.lengthRecovery >> 8;
// Append payload of RTP packet
memcpy(bitstring + 8, payload, fecContext.lengthRecovery);
}
void Packet::applyXOR(const uint8_t *in1, const uint8_t *in2, uint8_t *out, uint64_t size){
uint64_t index = 0;
for (index = 0; index < size; index++) {
out[index] = in1[index] ^ in2[index];
}
}
/// \brief Sends buffered FEC packets
/// \param socket UDP socket ready to send packets
/// \param buf bitstring we want to contain in a FEC packet
/// \param isColumn whether the buf we want to send represents a completed column or row
void Packet::sendFec(void *socket, FecData *fecData, bool isColumn){
uint8_t *data = fecData->bitstring;
// Create zero filled buffer
uint8_t *rtpBuf = (uint8_t *)malloc(fecContext.rtpBufSize);
memset(rtpBuf, 0, fecContext.rtpBufSize);
uint16_t thisSN = isColumn ? ++fecContext.columnSN : ++fecContext.rowSN;
// V, P, X, CC
rtpBuf[0] = 0x80 | (data[0] & 0x3f);
// M, PT
rtpBuf[1] = (data[1] & 0x80) | 0x60;
// SN
rtpBuf[3] = thisSN;
rtpBuf[2] = thisSN >> 8;
// TS
rtpBuf[7] = fecData->timestamp;
rtpBuf[6] = fecData->timestamp >> 8;
rtpBuf[5] = fecData->timestamp >> 16;
rtpBuf[4] = fecData->timestamp >> 24;
// Keep SSRC 0 and skip CSRC
// SNBase low (lowest sequence number of the sequence of RTP packets in this FEC packet)
rtpBuf[13] = fecData->sequence;
rtpBuf[12] = fecData->sequence >> 8;
// Length recovery
rtpBuf[14] = data[6];
rtpBuf[15] = data[7];
// E=1, PT recovery
rtpBuf[16] = 0x80 | data[1];
// Keep Mask 0
// TS recovery
rtpBuf[20] = data[2];
rtpBuf[21] = data[3];
rtpBuf[22] = data[4];
rtpBuf[23] = data[5];
// X=0, D, type=0, index=0
rtpBuf[24] = isColumn ? 0x0 : 0x40;
// offset (number of columns)
rtpBuf[25] = isColumn ? fecContext.columns : 0x1;
// NA (number of rows)
rtpBuf[26] = isColumn ? fecContext.rows : fecContext.columns;
// Keep SNBase ext bits 0
// Payload
memcpy(rtpBuf + 28, data + 8, fecContext.lengthRecovery);
((Socket::UDPConnection *)socket)->SendNow(reinterpret_cast<char*>(rtpBuf), fecContext.rtpBufSize);
sentPackets++;
sentBytes += fecContext.rtpBufSize;
free(rtpBuf);
}
/// \brief Parses new RTP packets
void Packet::parseFEC(void *columnSocket, void *rowSocket, uint64_t & bytesSent, const char *payload, unsigned int payloadlen){
if (!fecEnabled){
return;
}
uint8_t *bitstring;
uint8_t thisColumn;
uint8_t thisRow;
// Check to see if we need to reinit FEC data
if (fecContext.needsInit){
// Add space for the RTP header
initFEC(payloadlen + 12);
}
// Check the buffer size which should be constant
if (payloadlen != fecContext.lengthRecovery){
WARN_MSG("RTP packet size should be constant, expected %u but got %u", fecContext.lengthRecovery, payloadlen);
return;
}
// Create bitstring
bitstring = (uint8_t *)malloc(fecContext.pktSize);
generateBitstring(payload, payloadlen, bitstring);
thisColumn = fecContext.index % fecContext.columns;
thisRow = (fecContext.index / fecContext.columns) % fecContext.rows;
// Check for completed rows of data
if (thisColumn == 0){
// Double check if we have a final FEC row of data before sending it
if (!fecContext.isFirst || fecContext.index > 0){
if (thisRow == 0){
INSANE_MSG("Sending completed FEC packet at row %u", fecContext.rows - 1);
} else {
INSANE_MSG("Sending completed FEC packet at row %u", thisRow - 1);
}
sendFec(rowSocket, &fecContext.fecBufferRows, false);
bytesSent += fecContext.rtpBufSize;
}
free(fecContext.fecBufferRows.bitstring);
fecContext.fecBufferRows.bitstring = bitstring;
// Set the SN and TS of this first packet in the sequence
fecContext.fecBufferRows.sequence = getSequence() - 1;
fecContext.fecBufferRows.timestamp = getTimeStamp();
} else {
// This is an intermediate packet, apply XOR operation and continue
applyXOR(fecContext.fecBufferRows.bitstring, bitstring, fecContext.fecBufferRows.bitstring, fecContext.bitstringSize);
}
// XOR or set new bitstring
if (thisRow == 0){
// Make a copy if we are already using this bitstring for the FEC row
if (thisColumn == 0){
uint8_t *bitstringCopy;
bitstringCopy = (uint8_t *)malloc(fecContext.pktSize);
memcpy(bitstringCopy, bitstring, fecContext.pktSize);
fecContext.fecBufferColumns[thisColumn].bitstring = bitstringCopy;
} else {
fecContext.fecBufferColumns[thisColumn].bitstring = bitstring;
}
fecContext.fecBufferColumns[thisColumn].sequence = getSequence() - 1;
fecContext.fecBufferColumns[thisColumn].timestamp = getTimeStamp();
} else {
// This is an intermediate packet, apply XOR operation and continue
applyXOR(fecContext.fecBufferColumns[thisColumn].bitstring, bitstring, fecContext.fecBufferColumns[thisColumn].bitstring, fecContext.bitstringSize);
}
// Check for completed columns of data
if (thisRow == fecContext.rows - 1){
INSANE_MSG("Sending completed FEC packet at column %u", thisColumn);
sendFec(columnSocket, &fecContext.fecBufferColumns[thisColumn], true);
bytesSent += fecContext.rtpBufSize;
free(fecContext.fecBufferColumns[thisColumn].bitstring);
}
// Update variables
fecContext.index++;
if (fecContext.index >= fecContext.maxIndex){
fecContext.isFirst = false;
fecContext.index = 0;
}
}
void Packet::sendNoPacket(unsigned int payloadlen){
// Increment counters
sentPackets++;
sentBytes += payloadlen + getHsize();
setTimestamp(Util::bootMS());
increaseSequence();
}
void Packet::sendTS(void *socket, const char *payload, unsigned int payloadlen){
// Add TS payload
memcpy(data + getHsize(), payload, payloadlen);
INSANE_MSG("Sending RTP packet with header size %u and payload size %u", getHsize(), payloadlen);
// Set timestamp to current time
setTimestamp(Util::bootMS()*90);
// Send RTP packet itself
((Socket::UDPConnection *)socket)->SendNow(data, getHsize() + payloadlen);
// Increment counters
sentPackets++;
sentBytes += payloadlen + getHsize();
increaseSequence();
}
void Packet::sendH264(void *socket, void callBack(void *, const char *, size_t, uint8_t),
const char *payload, uint32_t payloadlen, uint32_t channel, bool lastOfAccesUnit){
if ((payload[0] & 0x1F) == 12){return;}
@ -347,6 +563,7 @@ namespace RTP{
maxDataLen = 0;
sentBytes = 0;
sentPackets = 0;
fecEnabled = false;
}
Packet::Packet(uint32_t payloadType, uint32_t sequence, uint64_t timestamp, uint32_t ssrc, uint32_t csrcCount){
@ -365,6 +582,7 @@ namespace RTP{
setSSRC(ssrc);
sentBytes = 0;
sentPackets = 0;
fecEnabled = false;
}
Packet::Packet(const Packet &o){
@ -418,6 +636,7 @@ namespace RTP{
maxDataLen = len;
sentBytes = 0;
sentPackets = 0;
fecEnabled = false;
data = (char *)dat;
}

View file

@ -29,6 +29,33 @@ namespace RTP{
extern unsigned int PACKET_REORDER_WAIT;
extern unsigned int PACKET_DROP_TIMEOUT;
struct FecData{
public:
uint16_t sequence;
uint32_t timestamp;
uint8_t *bitstring;
};
struct FEC{
public:
bool needsInit;
bool isFirst;
uint16_t maxIndex;
// Track the amount of row/column FEC packets were sent, as they have their own index
uint16_t columnSN;
uint16_t rowSN;
// Determines what row/column of FEC data we are currently on
uint8_t rows;
uint8_t columns;
uint16_t index;
uint16_t lengthRecovery;
uint32_t pktSize;
uint32_t rtpBufSize;
uint32_t bitstringSize;
FecData fecBufferRows; // Stores intermediate results or XOR'd RTP packets
std::map<uint8_t, FecData> fecBufferColumns;
};
/// This class is used to make RTP packets. Currently, H264, and AAC are supported. RTP
/// mechanisms, like increasing sequence numbers and setting timestamps are all taken care of in
/// here.
@ -39,6 +66,8 @@ namespace RTP{
uint32_t maxDataLen; ///< Amount of reserved bytes for the packet(s)
uint32_t sentPackets;
uint32_t sentBytes; // Because ugly is beautiful
bool fecEnabled;
FEC fecContext;
public:
static double startRTCP;
uint32_t getHsize() const;
@ -58,6 +87,14 @@ namespace RTP{
void setTimestamp(uint32_t t);
void increaseSequence();
void initFEC(uint64_t bufSize);
void applyXOR(const uint8_t *in1, const uint8_t *in2, uint8_t *out, uint64_t size);
void generateBitstring(const char *payload, unsigned int payloadlen, uint8_t *bitstring);
bool configureFEC(uint8_t rows, uint8_t columns);
void sendFec(void *socket, FecData *fecData, bool isColumn);
void parseFEC(void *columnSocket, void *rowSocket, uint64_t & bytesSent, const char *payload, unsigned int payloadlen);
void sendNoPacket(unsigned int payloadlen);
void sendTS(void *socket, const char *payload, unsigned int payloadlen);
void sendH264(void *socket, void callBack(void *, const char *, size_t, uint8_t), const char *payload,
unsigned int payloadlen, unsigned int channel, bool lastOfAccessUnit);
void sendVP8(void *socket, void callBack(void *, const char *, size_t, uint8_t),

View file

@ -10,12 +10,15 @@ namespace Mist{
sendRepeatingHeaders = 500; // PAT/PMT every 500ms (DVB spec)
streamName = config->getString("streamname");
pushOut = false;
sendFEC = false;
wrapRTP = false;
dropPercentage = 0;
std::string tracks = config->getString("tracks");
if (config->getString("target").size()){
HTTP::URL target(config->getString("target"));
if (target.protocol != "tsudp"){
FAIL_MSG("Target %s must begin with tsudp://, aborting", target.getUrl().c_str());
onFail("Invalid ts udp target: doesn't start with tsudp://", true);
if (target.protocol != "tsudp" && target.protocol != "tsrtp"){
FAIL_MSG("Target %s must begin with tsudp:// or tsrtp://, aborting", target.getUrl().c_str());
onFail("Invalid ts udp target: doesn't start with tsudp:// or tsrtp://", true);
return;
}
if (!target.getPort()){
@ -23,8 +26,40 @@ namespace Mist{
onFail("Invalid ts udp target: missing port", true);
return;
}
// Wrap TS packets inside an RTP packet
if (target.protocol == "tsrtp"){
// MP2T payload, no CSRC list and init to sequence number 1, random SSRC and random timestamp
tsOut = RTP::Packet(33, 1, rand(), rand());
wrapRTP = true;
}
if (wrapRTP && targetParams.count("fec")){
if (targetParams.at("fec") == "prompeg"){
uint8_t rows = 8;
uint8_t columns = 4;
if (targetParams.count("rows")){
rows = atoi(targetParams.at("rows").c_str());
}
if (targetParams.count("columns")){
columns = atoi(targetParams.at("columns").c_str());
}
if (tsOut.configureFEC(rows, columns)){
// Send Pro-MPEG FEC columns over port number + 2
fecColumnSock.SetDestination(target.host, target.getPort() + 2);
// Send Pro-MPEG FEC rows over port number + 4
fecRowSock.SetDestination(target.host, target.getPort() + 4);
sendFEC = true;
} else {
WARN_MSG("Failed to configure FEC. Running without forward error correction");
}
}else{
WARN_MSG("Unsupported FEC of name '%s'. Running without forward error correction", targetParams.at("fec").c_str());
}
}
if (targetParams.count("drop")){
dropPercentage = atoi(targetParams.at("drop").c_str());
}
pushOut = true;
udpSize = 5;
udpSize = 7;
if (targetParams.count("tracks")){tracks = targetParams["tracks"];}
if (targetParams.count("pkts")){udpSize = atoi(targetParams["pkts"].c_str());}
packetBuffer.reserve(188 * udpSize);
@ -131,12 +166,13 @@ namespace Mist{
cfg->addConnectorOptions(8888, capa);
config = cfg;
capa["push_urls"].append("tsudp://*");
capa["push_urls"].append("tsrtp://*");
JSON::Value opt;
opt["arg"] = "string";
opt["default"] = "";
opt["arg_num"] = 1;
opt["help"] = "Target tsudp:// URL to push out towards.";
opt["help"] = "Target tsudp:// or tsrtp:// URL to push out towards.";
cfg->addOption("target", opt);
}
@ -150,8 +186,26 @@ namespace Mist{
if (pushOut){
static size_t curFilled = 0;
if (curFilled == udpSize){
pushSock.SendNow(packetBuffer);
myConn.addUp(packetBuffer.size());
// in MPEG-TS over RTP mode, wrap TS packets in a RTP header
if (wrapRTP){
// Send RTP packet itself
if (rand() % 100 >= dropPercentage){
tsOut.sendTS(&pushSock, packetBuffer.c_str(), packetBuffer.size());
myConn.addUp(tsOut.getHsize() + tsOut.getPayloadSize());
} else {
INFO_MSG("Dropping RTP packet in order to simulate packet loss");
tsOut.sendNoPacket(packetBuffer.size());
}
if (sendFEC){
// Send FEC packet if available
uint64_t bytesSent = 0;
tsOut.parseFEC(&fecColumnSock, &fecRowSock, bytesSent, packetBuffer.c_str(), packetBuffer.size());
myConn.addUp(bytesSent);
}
}else{
pushSock.SendNow(packetBuffer);
myConn.addUp(packetBuffer.size());
}
packetBuffer.clear();
packetBuffer.reserve(udpSize * len);
curFilled = 0;

View file

@ -1,6 +1,6 @@
#include "output_ts_base.h"
#include <mist/ts_stream.h>
#include <mist/rtp.h>
namespace Mist{
class OutTS : public TSOutput{
public:
@ -16,10 +16,17 @@ namespace Mist{
private:
size_t udpSize;
bool pushOut;
bool wrapRTP;
bool sendFEC;
void onRTP(void *socket, const char *data, size_t nbytes);
std::string packetBuffer;
Socket::UDPConnection pushSock;
Socket::UDPConnection fecColumnSock;
Socket::UDPConnection fecRowSock;
uint8_t dropPercentage;
TS::Stream tsIn;
std::string getStatsName();
RTP::Packet tsOut;
protected:
inline virtual bool keepGoing(){