diff --git a/CMakeLists.txt b/CMakeLists.txt index e37ea1c0..51dca4da 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -462,6 +462,9 @@ macro(makeInput inputName format) if (";${ARGN};" MATCHES ";with_srt;") target_link_libraries(MistIn${inputName} mist_srt ) endif() + if (";${ARGN};" MATCHES ";with_rist;") + target_link_libraries(MistIn${inputName} rist cjson) + endif() #Set compile definitions unset(my_definitions) @@ -504,6 +507,9 @@ makeInput(SDP sdp) if(SRT_LIB) makeInput(TSSRT tssrt with_srt)#LTS endif() +if(RIST_LIB) + makeInput(TSRIST tsrist with_rist)#LTS +endif() ######################################## # MistServer - Outputs # diff --git a/src/input/input_tsrist.cpp b/src/input/input_tsrist.cpp new file mode 100644 index 00000000..18b05c9d --- /dev/null +++ b/src/input/input_tsrist.cpp @@ -0,0 +1,246 @@ +#include "input_tsrist.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +Mist::inputTSRIST *connPtr = 0; +Util::Config *cnfPtr = 0; + + +struct rist_logging_settings log_settings; +int rist_log_callback(void *, enum rist_log_level llvl, const char *msg){ + switch (llvl){ + case RIST_LOG_WARN: WARN_MSG("RIST: %s", msg); break; + case RIST_LOG_ERROR: ERROR_MSG("RIST: %s", msg); break; + case RIST_LOG_DEBUG: + case RIST_LOG_SIMULATE: DONTEVEN_MSG("RIST: %s", msg); break; + default: INFO_MSG("RIST: %s", msg); + } + return 0; +} + +uint64_t pktReceived = 0; +uint64_t pktLost = 0; +uint64_t pktRetransmitted = 0; +uint64_t downBytes = 0; + +int cb_stats(void *arg, const struct rist_stats *stats_container){ + JSON::Value stats = JSON::fromString(stats_container->stats_json, stats_container->json_size); + JSON::Value & sObj = stats["receiver-stats"]["flowinstant"]["stats"]; + pktReceived += sObj["received"].asInt(); + pktLost += sObj["lost"].asInt(); + pktRetransmitted += sObj["retries"].asInt(); + return 0; +} + +static int cb_recv(void *arg, struct rist_data_block *b){ + downBytes += b->payload_len; + if (cnfPtr && cnfPtr->is_active){ + connPtr->addData((const char*)b->payload, b->payload_len); + } + rist_receiver_data_block_free2(&b); + return 0; +} + + +namespace Mist{ + /// Constructor of TS Input + /// \arg cfg Util::Config that contains all current configurations. + inputTSRIST::inputTSRIST(Util::Config *cfg) : Input(cfg){ + connPtr = this; + cnfPtr = config; + + //Setup logger + log_settings.log_cb = &rist_log_callback; + log_settings.log_cb_arg = 0; + log_settings.log_socket = -1; + log_settings.log_stream = 0; + if (Util::printDebugLevel >= 10){ + log_settings.log_level = RIST_LOG_SIMULATE; + }else if(Util::printDebugLevel >= 4){ + log_settings.log_level = RIST_LOG_INFO; + }else{ + log_settings.log_level = RIST_LOG_WARN; + } + + capa["name"] = "TSRIST"; + capa["desc"] = "This input allows for processing MPEG2-TS-based RIST streams."; + capa["source_match"].append("rist://*"); + // These can/may be set to always-on mode + capa["always_match"].append("rist://*"); + capa["priority"] = 9; + capa["codecs"][0u][0u].append("H264"); + capa["codecs"][0u][0u].append("HEVC"); + capa["codecs"][0u][0u].append("MPEG2"); + capa["codecs"][0u][1u].append("AAC"); + capa["codecs"][0u][1u].append("MP3"); + capa["codecs"][0u][1u].append("AC3"); + capa["codecs"][0u][1u].append("MP2"); + capa["codecs"][0u][1u].append("opus"); + + JSON::Value option; + option["arg"] = "integer"; + option["long"] = "buffer"; + option["short"] = "b"; + option["help"] = "DVR buffer time in ms"; + option["value"].append(50000); + config->addOption("bufferTime", option); + option.null(); + capa["optional"]["DVR"]["name"] = "Buffer time (ms)"; + capa["optional"]["DVR"]["help"] = + "The target available buffer time for this live stream, in milliseconds. This is the time " + "available to seek around in, and will automatically be extended to fit whole keyframes as " + "well as the minimum duration needed for stable playback."; + capa["optional"]["DVR"]["option"] = "--buffer"; + capa["optional"]["DVR"]["type"] = "uint"; + capa["optional"]["DVR"]["default"] = 50000; + + option["arg"] = "integer"; + option["long"] = "profile"; + option["short"] = "P"; + option["help"] = "RIST profile (0=Simple, 1=Main)"; + option["value"].append(1); + config->addOption("profile", option); + option.null(); + capa["optional"]["profile"]["name"] = "RIST profile"; + capa["optional"]["profile"]["help"] = "RIST profile to use"; + capa["optional"]["profile"]["type"] = "select"; + capa["optional"]["profile"]["default"] = 1; + capa["optional"]["profile"]["select"][0u][0u] = 0; + capa["optional"]["profile"]["select"][0u][1u] = "Simple"; + capa["optional"]["profile"]["select"][1u][0u] = 1; + capa["optional"]["profile"]["select"][1u][1u] = "Main"; + capa["optional"]["profile"]["type"] = "select"; + capa["optional"]["profile"]["option"] = "--profile"; + + lastTimeStamp = 0; + timeStampOffset = 0; + receiver_ctx = 0; + } + + inputTSRIST::~inputTSRIST(){ + cnfPtr = 0; + rist_destroy(receiver_ctx); + } + + bool inputTSRIST::checkArguments(){return true;} + + /// Live Setup of SRT Input. Runs only if we are the "main" thread + bool inputTSRIST::preRun(){ + std::string source = config->getString("input"); + standAlone = false; + HTTP::URL u(source); + if (u.protocol != "rist"){ + FAIL_MSG("Input protocol must begin with rist://"); + return false; + } + std::map arguments; + HTTP::parseVars(u.args, arguments); + return true; + } + + // Retrieve the next packet to be played from the srt connection. + void inputTSRIST::getNext(size_t idx){ + thisPacket.null(); + while (!thisPacket && config->is_active){ + if (tsStream.hasPacket()){ + tsStream.getEarliestPacket(thisPacket); + }else{ + Util::sleep(50); + if (!bufferActive()){ + Util::logExitReason("Buffer shut down"); + return; + } + } + } + if (!config->is_active){return;} + + tsStream.initializeMetadata(meta); + thisIdx = M.trackIDToIndex(thisPacket.getTrackId(), getpid()); + if (thisIdx == INVALID_TRACK_ID){getNext(idx);} + + uint64_t adjustTime = thisPacket.getTime() + timeStampOffset; + if (lastTimeStamp || timeStampOffset){ + if (lastTimeStamp + 5000 < adjustTime || lastTimeStamp > adjustTime + 5000){ + INFO_MSG("Timestamp jump " PRETTY_PRINT_MSTIME " -> " PRETTY_PRINT_MSTIME ", compensating.", + PRETTY_ARG_MSTIME(lastTimeStamp), PRETTY_ARG_MSTIME(adjustTime)); + timeStampOffset += (lastTimeStamp - adjustTime); + adjustTime = thisPacket.getTime() + timeStampOffset; + } + } + lastTimeStamp = adjustTime; + thisPacket.setTime(adjustTime); + } + + void inputTSRIST::onFail(const std::string & msg){ + FAIL_MSG("%s", msg.c_str()); + Util::logExitReason(msg.c_str()); + } + + bool inputTSRIST::openStreamSource(){ + if (rist_receiver_create(&receiver_ctx, (rist_profile)config->getInteger("profile"), &log_settings) != 0){ + onFail("Failed to create RIST receiver context"); + return false; + } + struct rist_peer_config *peer_config_link = 0; + if (rist_parse_address2(config->getString("input").c_str(), &peer_config_link)){ + onFail("Failed to parse input URL: "+config->getString("input")); + return false; + } + strcpy(peer_config_link->cname, streamName.c_str()); + struct rist_peer *peer; + if (rist_peer_create(receiver_ctx, &peer, peer_config_link) == -1){ + onFail("Could not create RIST peer"); + return false; + } + if (rist_stats_callback_set(receiver_ctx, 1000, cb_stats, 0) == -1){ + onFail("Error setting up RIST stats callback"); + return false; + } + if (rist_receiver_data_callback_set2(receiver_ctx, cb_recv, 0)){ + onFail("Error setting up RIST data callback"); + return false; + } + if (rist_start(receiver_ctx) == -1){ + onFail("Failed to start RIST connection"); + return false; + } + return true; + } + + void inputTSRIST::addData(const char * ptr, size_t len){ + for (size_t o = 0; o <= len-188; o += 188){ + tsStream.parse((char*)ptr+o, 0); + } + } + + + void inputTSRIST::connStats(Comms::Statistics &statComm){ + statComm.setUp(0); + statComm.setDown(downBytes); + statComm.setHost(getConnectedBinHost()); + statComm.setPacketCount(pktReceived); + statComm.setPacketLostCount(pktLost); + statComm.setPacketRetransmitCount(pktRetransmitted); + } + +}// namespace Mist diff --git a/src/input/input_tsrist.h b/src/input/input_tsrist.h new file mode 100644 index 00000000..3bc29e1c --- /dev/null +++ b/src/input/input_tsrist.h @@ -0,0 +1,39 @@ +#include "input.h" +#include +#include +#include + +namespace Mist{ + + class inputTSRIST : public Input{ + public: + inputTSRIST(Util::Config *cfg); + ~inputTSRIST(); + virtual bool needsLock(){return false;} + virtual bool publishesTracks(){return false;} + virtual std::string getConnectedBinHost(){ + return Input::getConnectedBinHost(); + } + void onFail(const std::string & msg); + void addData(const char * ptr, size_t len); + + protected: + // Private Functions + bool checkArguments(); + bool preRun(); + virtual void getNext(size_t idx = INVALID_TRACK_ID); + virtual bool needHeader(){return false;} + + bool openStreamSource(); + TS::Stream tsStream; ///< Used for parsing the incoming ts stream + TS::Packet tsBuf; + int64_t timeStampOffset; + uint64_t lastTimeStamp; + + virtual void connStats(Comms::Statistics &statComm); + + struct rist_ctx *receiver_ctx; + }; +}// namespace Mist + +typedef Mist::inputTSRIST mistIn;