Added TS RIST input
This commit is contained in:
parent
6d4c52c3c0
commit
d22604c53a
3 changed files with 291 additions and 0 deletions
|
@ -462,6 +462,9 @@ macro(makeInput inputName format)
|
||||||
if (";${ARGN};" MATCHES ";with_srt;")
|
if (";${ARGN};" MATCHES ";with_srt;")
|
||||||
target_link_libraries(MistIn${inputName} mist_srt )
|
target_link_libraries(MistIn${inputName} mist_srt )
|
||||||
endif()
|
endif()
|
||||||
|
if (";${ARGN};" MATCHES ";with_rist;")
|
||||||
|
target_link_libraries(MistIn${inputName} rist cjson)
|
||||||
|
endif()
|
||||||
|
|
||||||
#Set compile definitions
|
#Set compile definitions
|
||||||
unset(my_definitions)
|
unset(my_definitions)
|
||||||
|
@ -504,6 +507,9 @@ makeInput(SDP sdp)
|
||||||
if(SRT_LIB)
|
if(SRT_LIB)
|
||||||
makeInput(TSSRT tssrt with_srt)#LTS
|
makeInput(TSSRT tssrt with_srt)#LTS
|
||||||
endif()
|
endif()
|
||||||
|
if(RIST_LIB)
|
||||||
|
makeInput(TSRIST tsrist with_rist)#LTS
|
||||||
|
endif()
|
||||||
|
|
||||||
########################################
|
########################################
|
||||||
# MistServer - Outputs #
|
# MistServer - Outputs #
|
||||||
|
|
246
src/input/input_tsrist.cpp
Normal file
246
src/input/input_tsrist.cpp
Normal file
|
@ -0,0 +1,246 @@
|
||||||
|
#include "input_tsrist.h"
|
||||||
|
#include <cerrno>
|
||||||
|
#include <cstdio>
|
||||||
|
#include <cstdlib>
|
||||||
|
#include <cstring>
|
||||||
|
#include <fstream>
|
||||||
|
#include <iomanip>
|
||||||
|
#include <iostream>
|
||||||
|
#include <mist/defines.h>
|
||||||
|
#include <mist/downloader.h>
|
||||||
|
#include <mist/flv_tag.h>
|
||||||
|
#include <mist/http_parser.h>
|
||||||
|
#include <mist/mp4_generic.h>
|
||||||
|
#include <mist/socket_srt.h>
|
||||||
|
#include <mist/stream.h>
|
||||||
|
#include <mist/timing.h>
|
||||||
|
#include <mist/ts_packet.h>
|
||||||
|
#include <mist/util.h>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
#include <mist/procs.h>
|
||||||
|
#include <mist/tinythread.h>
|
||||||
|
#include <sys/stat.h>
|
||||||
|
|
||||||
|
Mist::inputTSRIST *connPtr = 0;
|
||||||
|
Util::Config *cnfPtr = 0;
|
||||||
|
|
||||||
|
|
||||||
|
struct rist_logging_settings log_settings;
|
||||||
|
int rist_log_callback(void *, enum rist_log_level llvl, const char *msg){
|
||||||
|
switch (llvl){
|
||||||
|
case RIST_LOG_WARN: WARN_MSG("RIST: %s", msg); break;
|
||||||
|
case RIST_LOG_ERROR: ERROR_MSG("RIST: %s", msg); break;
|
||||||
|
case RIST_LOG_DEBUG:
|
||||||
|
case RIST_LOG_SIMULATE: DONTEVEN_MSG("RIST: %s", msg); break;
|
||||||
|
default: INFO_MSG("RIST: %s", msg);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t pktReceived = 0;
|
||||||
|
uint64_t pktLost = 0;
|
||||||
|
uint64_t pktRetransmitted = 0;
|
||||||
|
uint64_t downBytes = 0;
|
||||||
|
|
||||||
|
int cb_stats(void *arg, const struct rist_stats *stats_container){
|
||||||
|
JSON::Value stats = JSON::fromString(stats_container->stats_json, stats_container->json_size);
|
||||||
|
JSON::Value & sObj = stats["receiver-stats"]["flowinstant"]["stats"];
|
||||||
|
pktReceived += sObj["received"].asInt();
|
||||||
|
pktLost += sObj["lost"].asInt();
|
||||||
|
pktRetransmitted += sObj["retries"].asInt();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int cb_recv(void *arg, struct rist_data_block *b){
|
||||||
|
downBytes += b->payload_len;
|
||||||
|
if (cnfPtr && cnfPtr->is_active){
|
||||||
|
connPtr->addData((const char*)b->payload, b->payload_len);
|
||||||
|
}
|
||||||
|
rist_receiver_data_block_free2(&b);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
namespace Mist{
|
||||||
|
/// Constructor of TS Input
|
||||||
|
/// \arg cfg Util::Config that contains all current configurations.
|
||||||
|
inputTSRIST::inputTSRIST(Util::Config *cfg) : Input(cfg){
|
||||||
|
connPtr = this;
|
||||||
|
cnfPtr = config;
|
||||||
|
|
||||||
|
//Setup logger
|
||||||
|
log_settings.log_cb = &rist_log_callback;
|
||||||
|
log_settings.log_cb_arg = 0;
|
||||||
|
log_settings.log_socket = -1;
|
||||||
|
log_settings.log_stream = 0;
|
||||||
|
if (Util::printDebugLevel >= 10){
|
||||||
|
log_settings.log_level = RIST_LOG_SIMULATE;
|
||||||
|
}else if(Util::printDebugLevel >= 4){
|
||||||
|
log_settings.log_level = RIST_LOG_INFO;
|
||||||
|
}else{
|
||||||
|
log_settings.log_level = RIST_LOG_WARN;
|
||||||
|
}
|
||||||
|
|
||||||
|
capa["name"] = "TSRIST";
|
||||||
|
capa["desc"] = "This input allows for processing MPEG2-TS-based RIST streams.";
|
||||||
|
capa["source_match"].append("rist://*");
|
||||||
|
// These can/may be set to always-on mode
|
||||||
|
capa["always_match"].append("rist://*");
|
||||||
|
capa["priority"] = 9;
|
||||||
|
capa["codecs"][0u][0u].append("H264");
|
||||||
|
capa["codecs"][0u][0u].append("HEVC");
|
||||||
|
capa["codecs"][0u][0u].append("MPEG2");
|
||||||
|
capa["codecs"][0u][1u].append("AAC");
|
||||||
|
capa["codecs"][0u][1u].append("MP3");
|
||||||
|
capa["codecs"][0u][1u].append("AC3");
|
||||||
|
capa["codecs"][0u][1u].append("MP2");
|
||||||
|
capa["codecs"][0u][1u].append("opus");
|
||||||
|
|
||||||
|
JSON::Value option;
|
||||||
|
option["arg"] = "integer";
|
||||||
|
option["long"] = "buffer";
|
||||||
|
option["short"] = "b";
|
||||||
|
option["help"] = "DVR buffer time in ms";
|
||||||
|
option["value"].append(50000);
|
||||||
|
config->addOption("bufferTime", option);
|
||||||
|
option.null();
|
||||||
|
capa["optional"]["DVR"]["name"] = "Buffer time (ms)";
|
||||||
|
capa["optional"]["DVR"]["help"] =
|
||||||
|
"The target available buffer time for this live stream, in milliseconds. This is the time "
|
||||||
|
"available to seek around in, and will automatically be extended to fit whole keyframes as "
|
||||||
|
"well as the minimum duration needed for stable playback.";
|
||||||
|
capa["optional"]["DVR"]["option"] = "--buffer";
|
||||||
|
capa["optional"]["DVR"]["type"] = "uint";
|
||||||
|
capa["optional"]["DVR"]["default"] = 50000;
|
||||||
|
|
||||||
|
option["arg"] = "integer";
|
||||||
|
option["long"] = "profile";
|
||||||
|
option["short"] = "P";
|
||||||
|
option["help"] = "RIST profile (0=Simple, 1=Main)";
|
||||||
|
option["value"].append(1);
|
||||||
|
config->addOption("profile", option);
|
||||||
|
option.null();
|
||||||
|
capa["optional"]["profile"]["name"] = "RIST profile";
|
||||||
|
capa["optional"]["profile"]["help"] = "RIST profile to use";
|
||||||
|
capa["optional"]["profile"]["type"] = "select";
|
||||||
|
capa["optional"]["profile"]["default"] = 1;
|
||||||
|
capa["optional"]["profile"]["select"][0u][0u] = 0;
|
||||||
|
capa["optional"]["profile"]["select"][0u][1u] = "Simple";
|
||||||
|
capa["optional"]["profile"]["select"][1u][0u] = 1;
|
||||||
|
capa["optional"]["profile"]["select"][1u][1u] = "Main";
|
||||||
|
capa["optional"]["profile"]["type"] = "select";
|
||||||
|
capa["optional"]["profile"]["option"] = "--profile";
|
||||||
|
|
||||||
|
lastTimeStamp = 0;
|
||||||
|
timeStampOffset = 0;
|
||||||
|
receiver_ctx = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
inputTSRIST::~inputTSRIST(){
|
||||||
|
cnfPtr = 0;
|
||||||
|
rist_destroy(receiver_ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool inputTSRIST::checkArguments(){return true;}
|
||||||
|
|
||||||
|
/// Live Setup of SRT Input. Runs only if we are the "main" thread
|
||||||
|
bool inputTSRIST::preRun(){
|
||||||
|
std::string source = config->getString("input");
|
||||||
|
standAlone = false;
|
||||||
|
HTTP::URL u(source);
|
||||||
|
if (u.protocol != "rist"){
|
||||||
|
FAIL_MSG("Input protocol must begin with rist://");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
std::map<std::string, std::string> arguments;
|
||||||
|
HTTP::parseVars(u.args, arguments);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve the next packet to be played from the srt connection.
|
||||||
|
void inputTSRIST::getNext(size_t idx){
|
||||||
|
thisPacket.null();
|
||||||
|
while (!thisPacket && config->is_active){
|
||||||
|
if (tsStream.hasPacket()){
|
||||||
|
tsStream.getEarliestPacket(thisPacket);
|
||||||
|
}else{
|
||||||
|
Util::sleep(50);
|
||||||
|
if (!bufferActive()){
|
||||||
|
Util::logExitReason("Buffer shut down");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!config->is_active){return;}
|
||||||
|
|
||||||
|
tsStream.initializeMetadata(meta);
|
||||||
|
thisIdx = M.trackIDToIndex(thisPacket.getTrackId(), getpid());
|
||||||
|
if (thisIdx == INVALID_TRACK_ID){getNext(idx);}
|
||||||
|
|
||||||
|
uint64_t adjustTime = thisPacket.getTime() + timeStampOffset;
|
||||||
|
if (lastTimeStamp || timeStampOffset){
|
||||||
|
if (lastTimeStamp + 5000 < adjustTime || lastTimeStamp > adjustTime + 5000){
|
||||||
|
INFO_MSG("Timestamp jump " PRETTY_PRINT_MSTIME " -> " PRETTY_PRINT_MSTIME ", compensating.",
|
||||||
|
PRETTY_ARG_MSTIME(lastTimeStamp), PRETTY_ARG_MSTIME(adjustTime));
|
||||||
|
timeStampOffset += (lastTimeStamp - adjustTime);
|
||||||
|
adjustTime = thisPacket.getTime() + timeStampOffset;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
lastTimeStamp = adjustTime;
|
||||||
|
thisPacket.setTime(adjustTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
void inputTSRIST::onFail(const std::string & msg){
|
||||||
|
FAIL_MSG("%s", msg.c_str());
|
||||||
|
Util::logExitReason(msg.c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
bool inputTSRIST::openStreamSource(){
|
||||||
|
if (rist_receiver_create(&receiver_ctx, (rist_profile)config->getInteger("profile"), &log_settings) != 0){
|
||||||
|
onFail("Failed to create RIST receiver context");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
struct rist_peer_config *peer_config_link = 0;
|
||||||
|
if (rist_parse_address2(config->getString("input").c_str(), &peer_config_link)){
|
||||||
|
onFail("Failed to parse input URL: "+config->getString("input"));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
strcpy(peer_config_link->cname, streamName.c_str());
|
||||||
|
struct rist_peer *peer;
|
||||||
|
if (rist_peer_create(receiver_ctx, &peer, peer_config_link) == -1){
|
||||||
|
onFail("Could not create RIST peer");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (rist_stats_callback_set(receiver_ctx, 1000, cb_stats, 0) == -1){
|
||||||
|
onFail("Error setting up RIST stats callback");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (rist_receiver_data_callback_set2(receiver_ctx, cb_recv, 0)){
|
||||||
|
onFail("Error setting up RIST data callback");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (rist_start(receiver_ctx) == -1){
|
||||||
|
onFail("Failed to start RIST connection");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void inputTSRIST::addData(const char * ptr, size_t len){
|
||||||
|
for (size_t o = 0; o <= len-188; o += 188){
|
||||||
|
tsStream.parse((char*)ptr+o, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void inputTSRIST::connStats(Comms::Statistics &statComm){
|
||||||
|
statComm.setUp(0);
|
||||||
|
statComm.setDown(downBytes);
|
||||||
|
statComm.setHost(getConnectedBinHost());
|
||||||
|
statComm.setPacketCount(pktReceived);
|
||||||
|
statComm.setPacketLostCount(pktLost);
|
||||||
|
statComm.setPacketRetransmitCount(pktRetransmitted);
|
||||||
|
}
|
||||||
|
|
||||||
|
}// namespace Mist
|
39
src/input/input_tsrist.h
Normal file
39
src/input/input_tsrist.h
Normal file
|
@ -0,0 +1,39 @@
|
||||||
|
#include "input.h"
|
||||||
|
#include <mist/ts_packet.h>
|
||||||
|
#include <mist/ts_stream.h>
|
||||||
|
#include <librist/librist.h>
|
||||||
|
|
||||||
|
namespace Mist{
|
||||||
|
|
||||||
|
class inputTSRIST : public Input{
|
||||||
|
public:
|
||||||
|
inputTSRIST(Util::Config *cfg);
|
||||||
|
~inputTSRIST();
|
||||||
|
virtual bool needsLock(){return false;}
|
||||||
|
virtual bool publishesTracks(){return false;}
|
||||||
|
virtual std::string getConnectedBinHost(){
|
||||||
|
return Input::getConnectedBinHost();
|
||||||
|
}
|
||||||
|
void onFail(const std::string & msg);
|
||||||
|
void addData(const char * ptr, size_t len);
|
||||||
|
|
||||||
|
protected:
|
||||||
|
// Private Functions
|
||||||
|
bool checkArguments();
|
||||||
|
bool preRun();
|
||||||
|
virtual void getNext(size_t idx = INVALID_TRACK_ID);
|
||||||
|
virtual bool needHeader(){return false;}
|
||||||
|
|
||||||
|
bool openStreamSource();
|
||||||
|
TS::Stream tsStream; ///< Used for parsing the incoming ts stream
|
||||||
|
TS::Packet tsBuf;
|
||||||
|
int64_t timeStampOffset;
|
||||||
|
uint64_t lastTimeStamp;
|
||||||
|
|
||||||
|
virtual void connStats(Comms::Statistics &statComm);
|
||||||
|
|
||||||
|
struct rist_ctx *receiver_ctx;
|
||||||
|
};
|
||||||
|
}// namespace Mist
|
||||||
|
|
||||||
|
typedef Mist::inputTSRIST mistIn;
|
Loading…
Add table
Reference in a new issue