Merged MistInTS and MistInTSStream, added support for streamed file input through stream:// source

This commit is contained in:
Thulinma 2016-08-26 01:59:49 +02:00
parent 44fd455c8e
commit eccd3d2949
4 changed files with 97 additions and 123 deletions

View file

@ -295,9 +295,6 @@ macro(makeInput inputName format)
#Set compile definitions
unset(my_definitions)
if (";${ARGN};" MATCHES ";tslive;")
list(APPEND my_definitions "TSLIVE_INPUT")
endif()
list(APPEND my_definitions "INPUTTYPE=\"input_${format}.h\"")
set_target_properties(MistIn${inputName}
@ -326,7 +323,6 @@ makeInput(Buffer buffer)
makeInput(ISMV ismv)#LTS
makeInput(MP4 mp4)#LTS
makeInput(TS ts)#LTS
makeInput(TSStream ts tslive)#LTS
makeInput(Folder folder folder)#LTS
########################################

View file

@ -68,7 +68,7 @@ namespace Controller {
// False: start TS input
INFO_MSG("No TS Input running on port %s for stream %s, starting it", udpPort.c_str(), name.c_str());
std::deque<std::string> command;
command.push_back(Util::getMyPath() + "MistInTSStream");
command.push_back(Util::getMyPath() + "MistInTS");
command.push_back("-s");
command.push_back(name);
command.push_back("-p");

View file

@ -19,22 +19,6 @@
#define SEM_TS_CLAIM "/MstTSIN%s"
/// \todo Implement this trigger equivalent...
/*
if(Triggers::shouldTrigger("STREAM_PUSH", smp)){
std::string payload = streamName+"\n" + myConn.getHost() +"\n"+capa["name"].asStringRef()+"\n"+reqUrl;
if (!Triggers::doTrigger("STREAM_PUSH", payload, smp)){
DEBUG_MSG(DLVL_FAIL, "Push from %s to %s rejected - STREAM_PUSH trigger denied the push", myConn.getHost().c_str(), streamName.c_str());
myConn.close();
configLock.post();
configLock.close();
return;
}
}
*/
#ifdef TSLIVE_INPUT
std::string globalStreamName;
TS::Stream liveStream(true);
Util::Config * cfgPointer = NULL;
@ -108,16 +92,15 @@ void parseThread(void * ignored) {
myProxy.userClient.finish();
}
#endif
namespace Mist {
/// Constructor of TS Input
/// \arg cfg Util::Config that contains all current configurations.
inputTS::inputTS(Util::Config * cfg) : Input(cfg) {
capa["name"] = "TS";
capa["decs"] = "Enables TS Input";
capa["source_match"] = "/*.ts";
capa["decs"] = "MPEG2-TS input from static files, streamed files, or multicast/unicast UDP socket";
capa["source_match"].append("/*.ts");
capa["source_match"].append("stream://*.ts");
capa["priority"] = 9ll;
capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][0u].append("HEVC");
@ -138,21 +121,16 @@ namespace Mist {
capa["optional"]["multicastinterface"]["type"] = "str";
capa["optional"]["multicastinterface"]["default"] = "";
cfg->addOption("multicastinterface",
JSON::fromString("{\"arg\":\"string\",\"value\":\"\",\"short\":\"M\",\"long\":\"multicast-interface\",\"help\":\"The interfaces on which to listen for UDP Multicast packets, space separatered.\"}"));
JSON::fromString("{\"arg\":\"string\",\"value\":\"\",\"short\":\"M\",\"long\":\"multicast-interface\",\"help\":\"The interfaces on which to listen for UDP Multicast packets, space separated.\"}"));
pushing = false;
inFile = NULL;
#ifdef TSLIVE_INPUT
standAlone = false;
#endif
}
inputTS::~inputTS() {
if (inFile) {
fclose(inFile);
}
#ifdef TSLIVE_INPUT
if (!standAlone){
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str());
IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
@ -161,18 +139,28 @@ namespace Mist {
claimableThreads.clear();
lock.post();
lock.unlink();
#endif
}
}
#ifdef TSLIVE_INPUT
///Live Setup of TS Input
bool inputTS::setup() {
INFO_MSG("Setup start");
if (config->getString("input") == "-") {
const std::string & inpt = config->getString("input");
if (inpt.size() && (inpt != "-" || inpt.substr(0,9) == "stream://")){
if (inpt.substr(0,9) == "stream://"){
inFile = fopen(inpt.c_str()+9, "r");
standAlone = false;
}else{
inFile = fopen(inpt.c_str(), "r");
}
if (!inFile) {
return false;
}
}else{
standAlone = false;
if (inpt == "-") {
inFile = stdin;
} else {
pushing = true;
udpCon.setBlocking(false);
std::string ipPort = config->getString("port");
size_t colon = ipPort.rfind(':');
@ -182,24 +170,10 @@ namespace Mist {
udpCon.bind(JSON::Value(ipPort).asInt(), "", config->getString("multicastinterface"));
}
}
INFO_MSG("Setup complete");
return true;
}
#else
///Setup of TS Input
bool inputTS::setup() {
if (config->getString("input") != "-") {
inFile = fopen(config->getString("input").c_str(), "r");
}
if (!inFile) {
return false;
}
return true;
}
#endif
///Track selector of TS Input
///\arg trackSpec specifies which tracks are to be selected
@ -219,13 +193,6 @@ namespace Mist {
}
#ifdef TSLIVE_INPUT
//This implementation in used in the live version of TS input, where no header is available in advance.
//Reading the header returns true in this case, to continue parsing the actual stream.
bool inputTS::readHeader() {
return true;
}
#else
///Reads headers from a TS stream, and saves them into metadata
///It works by going through the entire TS stream, and every time
///It encounters a new PES start, it writes the currently found PES data
@ -233,6 +200,7 @@ namespace Mist {
///it writes the remaining metadata.
///\todo Find errors, perhaps parts can be made more modular
bool inputTS::readHeader() {
if (!standAlone){return true;}
if (!inFile){return false;}
//See whether a separate header file exists.
if (readExistingHeader()){return true;}
@ -259,7 +227,6 @@ namespace Mist {
myMeta.toFile(config->getString("input") + ".dtsh");
return true;
}
#endif
///Gets the next packet that is to be sent
///At the moment, the logic of sending the last packet that was finished has been implemented,
@ -269,37 +236,18 @@ namespace Mist {
INSANE_MSG("Getting next");
thisPacket.null();
bool hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack());
while (!hasPacket && (pushing || !feof(inFile)) && config->is_active) {
if (!pushing) {
while (!hasPacket && !feof(inFile) && config->is_active) {
unsigned int bPos = ftell(inFile);
tsBuf.FromFile(inFile);
if (selectedTracks.count(tsBuf.getPID())) {
tsStream.parse(tsBuf, bPos);
}
} else {
while (udpCon.Receive()) {
udpDataBuffer.append(udpCon.data, udpCon.data_len);
while (udpDataBuffer.size() > 188 && (udpDataBuffer[0] != 0x47 || udpDataBuffer[188] != 0x47)) {
size_t syncPos = udpDataBuffer.find("\107", 1);
udpDataBuffer.erase(0, syncPos);
}
while (udpDataBuffer.size() >= 188) {
tsBuf.FromPointer(udpDataBuffer.data());
tsStream.parse(tsBuf, 0);
udpDataBuffer.erase(0, 188);
}
}
Util::sleep(500);
}
hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack());
}
if (!hasPacket) {
if (inFile && !feof(inFile)) {
if (!feof(inFile)) {
getNext();
}
if (pushing) {
sleep(500);
}
return;
}
if (selectedTracks.size() == 1) {
@ -361,17 +309,27 @@ namespace Mist {
fseek(inFile, seekPos, SEEK_SET);//seek to the correct position
}
#ifdef TSLIVE_INPUT
void inputTS::stream() {
if (!Util::startInput(streamName, "push://")) {//manually override stream url to start the buffer
FAIL_MSG("Could not start buffer for %s", streamName.c_str());
return;
}
IPC::sharedClient statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true);
uint64_t downCounter = 0;
uint64_t startTime = Util::epoch();
cfgPointer = config;
globalStreamName = streamName;
unsigned long long threadCheckTimer = Util::bootSecs();
while (config->is_active) {
if (!pushing) {
unsigned int bPos = ftell(inFile);
if (inFile) {
if (feof(inFile)){
config->is_active = false;
INFO_MSG("Reached end of file on streamed input");
}
int ctr = 0;
while (ctr < 20 && tsBuf.FromFile(inFile)){
while (ctr < 20 && tsBuf.FromFile(inFile) && !feof(inFile)){
liveStream.add(tsBuf);
downCounter += 188;
ctr++;
}
} else {
@ -384,6 +342,7 @@ namespace Mist {
if (udpCon.data[0] == 0x47){//check for sync byte
if (offset + 188 <= udpCon.data_len){
liveStream.add(udpCon.data + offset);
downCounter += 188;
}else{
leftData.append(udpCon.data + offset, udpCon.data_len - offset);
}
@ -393,6 +352,7 @@ namespace Mist {
leftData.append(udpCon.data + offset, 1);
if (leftData.size() >= 188){
liveStream.add((char*)leftData.data());
downCounter += 188;
leftData.erase(0, 188);
}
}
@ -403,6 +363,28 @@ namespace Mist {
}
//Check for and spawn threads here.
if (Util::bootSecs() - threadCheckTimer > 2) {
//Connect to stats for INPUT detection
uint64_t now = Util::epoch();
if (!statsPage.getData()){
statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true);
}
if (statsPage.getData()){
if (!statsPage.isAlive()){
config->is_active = false;
return;
}
IPC::statExchange tmpEx(statsPage.getData());
tmpEx.now(now);
tmpEx.crc(getpid());
tmpEx.streamName(streamName);
tmpEx.connector("INPUT");
tmpEx.up(0);
tmpEx.down(downCounter);
tmpEx.time(now - startTime);
tmpEx.lastSecond(0);
statsPage.keepAlive();
}
std::set<unsigned long> activeTracks = liveStream.getActiveTracks();
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str());
@ -426,7 +408,7 @@ namespace Mist {
lock.post();
threadCheckTimer = Util::bootSecs();
}
if (pushing){
if (!inFile){
Util::sleep(100);
}
}
@ -435,6 +417,10 @@ namespace Mist {
}
void inputTS::finish() {
if (standAlone){
Input::finish();
return;
}
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str());
IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
@ -452,9 +438,16 @@ namespace Mist {
}
bool inputTS::needsLock() {
//we already know no lock will be needed
if (!standAlone){return false;}
//otherwise, check input param
const std::string & inpt = config->getString("input");
if (inpt.size() && inpt != "-" && inpt.substr(0,9) != "stream://"){
return true;
}else{
return false;
}
#endif
}
}

View file

@ -13,9 +13,7 @@ namespace Mist {
public:
inputTS(Util::Config * cfg);
~inputTS();
#ifdef TSLIVE_INPUT
bool needsLock();
#endif
protected:
//Private Functions
bool setup();
@ -24,25 +22,12 @@ namespace Mist {
void seek(int seekTime);
void trackSelect(std::string trackSpec);
void readPMT();
#ifdef TSLIVE_INPUT
//Live tsinput does not have a header, so parseheader should do nothing
void parseHeader() { }
//In case of live TS Input, we override the default serve function
void stream();
void finish();
#endif
FILE * inFile;///<The input file with ts data
TS::Stream tsStream;///<Used for parsing the incoming ts stream
bool pushing;
Socket::UDPConnection udpCon;
std::string udpDataBuffer;
TS::Packet tsBuf;
};
}