Fixed TS, added several parameters for MistConnTS, controller now builds parameters correctly (spaces issue)

This commit is contained in:
ThatGuy 2013-07-29 11:40:16 +02:00
parent 67815bcce4
commit 2cc27efcae
4 changed files with 154 additions and 16 deletions

View file

@ -293,7 +293,7 @@ namespace Connector_HTTP {
}
}
ToPack.prepend(TS::Packet::getPESVideoLeadIn(0ul, Strm.getPacket()["time"].asInt() * 90));
PIDno = 0x100;
PIDno = 0x100 - 1 + Strm.getPacket()["trackid"].asInt();
ContCounter = &VideoCounter;
}else if (Strm.lastType() == DTSC::AUDIO){
ToPack.append(TS::GetAudioHeader(Strm.lastData().size(), Strm.getTrackById(audioTrackID)["init"].asString()));
@ -303,7 +303,7 @@ namespace Connector_HTTP {
}else{
ToPack.prepend(TS::Packet::getPESAudioLeadIn(ToPack.bytes(1073741824ul), Strm.getPacket()["time"].asInt() * 90));
}
PIDno = 0x101;
PIDno = 0x100 - 1 + Strm.getPacket()["trackid"].asInt();
ContCounter = &AudioCounter;
IsKeyFrame = false;
}

View file

@ -29,7 +29,7 @@ namespace Connector_TS {
///\param conn A socket describing the connection the client.
///\param streamName The stream to connect to.
///\return The exit code of the connector.
int tsConnector(Socket::Connection conn, std::string streamName){
int tsConnector(Socket::Connection conn, std::string streamName, std::string trackIDs){
std::string ToPack;
TS::Packet PackData;
std::string DTMIData;
@ -59,15 +59,54 @@ namespace Connector_TS {
conn.close();
break;
}
ss.SendNow("p\n");
if(trackIDs == ""){
// no track ids given? Find the first video and first audio track (if available) and use those!
int videoID = -1;
int audioID = -1;
//make sure metadata is received
while ( !Strm.metadata && ss.connected()){
if (ss.spool()){
while (Strm.parsePacket(ss.Received())){
//do nothing
}
}
}
if (Strm.metadata.isMember("tracks")){
for (JSON::ObjIter trackIt = Strm.metadata["tracks"].ObjBegin(); trackIt != Strm.metadata["tracks"].ObjEnd(); trackIt++){
if (audioID == -1 && trackIt->second["type"].asString() == "audio"){
audioID = trackIt->second["trackid"].asInt();
if( trackIDs != ""){
trackIDs += " " + trackIt->second["trackid"].asString();
}else{
trackIDs = trackIt->second["trackid"].asString();
}
}
if (videoID == -1 && trackIt->second["type"].asString() == "video"){
videoID = trackIt->second["trackid"].asInt();
if( trackIDs != ""){
trackIDs += " " + trackIt->second["trackid"].asString();
}else{
trackIDs = trackIt->second["trackid"].asString();
}
}
} // for iterator
} // if isMember("tracks")
} // if trackIDs == ""
std::string cmd = "t " + trackIDs + "\np\n";
ss.SendNow( cmd );
inited = true;
}
if (ss.spool()){
while (Strm.parsePacket(ss.Received())){
if ( !haveAvcc){
avccbox.setPayload(Strm.metadata["video"]["init"].asString());
haveAvcc = true;
}
std::stringstream TSBuf;
Socket::Buffer ToPack;
//write PAT and PMT TS packets
@ -82,12 +121,17 @@ namespace Connector_TS {
int PIDno = 0;
char * ContCounter = 0;
if (Strm.lastType() == DTSC::VIDEO){
if ( !haveAvcc){
avccbox.setPayload(Strm.getTrackById(Strm.getPacket()["trackid"].asInt())["init"].asString());
haveAvcc = true;
}
IsKeyFrame = Strm.getPacket().isMember("keyframe");
if (IsKeyFrame){
TimeStamp = (Strm.getPacket()["time"].asInt() * 27000);
}
ToPack.append(avccbox.asAnnexB());
while (Strm.lastData().size()){
while (Strm.lastData().size() > 4){
ThisNaluSize = (Strm.lastData()[0] << 24) + (Strm.lastData()[1] << 16) + (Strm.lastData()[2] << 8) + Strm.lastData()[3];
Strm.lastData().replace(0, 4, TS::NalHeader, 4);
if (ThisNaluSize + 4 == Strm.lastData().size()){
@ -99,14 +143,15 @@ namespace Connector_TS {
}
}
ToPack.prepend(TS::Packet::getPESVideoLeadIn(0ul, Strm.getPacket()["time"].asInt() * 90));
PIDno = 0x100;
PIDno = 0x100 - 1 + Strm.getPacket()["trackid"].asInt();
ContCounter = &VideoCounter;
}else if (Strm.lastType() == DTSC::AUDIO){
ToPack.append(TS::GetAudioHeader(Strm.lastData().size(), Strm.metadata["audio"]["init"].asString()));
ToPack.append(TS::GetAudioHeader(Strm.lastData().size(), Strm.getTrackById(Strm.getPacket()["trackid"].asInt())["init"].asString()));
ToPack.append(Strm.lastData());
ToPack.prepend(TS::Packet::getPESAudioLeadIn(ToPack.bytes(1073741824ul), Strm.getPacket()["time"].asInt() * 90));
PIDno = 0x101;
PIDno = 0x100 - 1 + Strm.getPacket()["trackid"].asInt();
ContCounter = &AudioCounter;
IsKeyFrame = false;
}
//initial packet
@ -154,6 +199,8 @@ int main(int argc, char ** argv){
Util::Config conf(argv[0], PACKAGE_VERSION);
conf.addOption("streamname",
JSON::fromString("{\"arg\":\"string\",\"arg_num\":1,\"help\":\"The name of the stream that this connector will transmit.\"}"));
conf.addOption("tracks",
JSON::fromString("{\"arg\":\"string\",\"default\":\"\",\"short\": \"t\",\"long\":\"tracks\",\"help\":\"The track IDs of the stream that this connector will transmit separated by spaces.\"}"));
conf.addConnectorOptions(8888);
conf.parseArgs(argc, argv);
Socket::Server server_socket = Socket::Server(conf.getInteger("listen_port"), conf.getString("listen_interface"));
@ -167,7 +214,7 @@ int main(int argc, char ** argv){
if (S.connected()){ //check if the new connection is valid
pid_t myid = fork();
if (myid == 0){ //if new child, start MAINHANDLER
return Connector_TS::tsConnector(S, conf.getString("streamname"));
return Connector_TS::tsConnector(S, conf.getString("streamname"), conf.getString("tracks"));
}else{ //otherwise, do nothing or output debugging text
#if DEBUG >= 5
fprintf(stderr, "Spawned new process %i for socket %i\n", (int)myid, S.getSocket());

View file

@ -184,6 +184,9 @@ namespace Controller {
capa["connectors"]["TS"]["optional"]["username"]["name"] = "Username";
capa["connectors"]["TS"]["optional"]["username"]["help"] = "Username to drop privileges to - default if unprovided means do not drop privileges";
capa["connectors"]["TS"]["optional"]["username"]["type"] = "str";
capa["connectors"]["TS"]["optional"]["tracks"]["name"] = "Tracks";
capa["connectors"]["TS"]["optional"]["tracks"]["help"] = "The track IDs of the stream that this connector will transmit separated by spaces";
capa["connectors"]["TS"]["optional"]["tracks"]["type"] = "str";
capa["connectors"]["HTTP"]["desc"] =
"Enables the generic HTTP listener, required by all other HTTP protocols. Needs other HTTP protocols enabled to do much of anything.";
capa["connectors"]["HTTP"]["deps"] = "";

View file

@ -1,3 +1,6 @@
#include <stdio.h> // cout, cerr
#include <string>
#include <cstring> // strcpy
#include <mist/json.h>
#include <mist/config.h>
#include <mist/procs.h>
@ -10,6 +13,8 @@ namespace Controller {
static std::map<std::string, std::string> currentConnectors; ///<The currently running connectors.
///\brief Checks if the binary mentioned in the protocol argument is currently active, if so, restarts it.
///\param protocol The protocol to check.
void UpdateProtocol(std::string protocol){
@ -32,7 +37,64 @@ namespace Controller {
}
}
///\brief Checks current protocol configuration, updates state of enabled connectors if neccesary.
void buildPipedArguments(JSON::Value & p, std::string conn, char * argarr[]){
int argnum = 2; //first two are progname and -n
std::string arg;
std::string conname;
for (JSON::ArrIter ait = p.ArrBegin(); ait != p.ArrEnd(); ait++){
conname = (std::string("MistConn") + ( *ait)["connector"].asString());
conn = conn.substr(0, conn.find(" ") );
if ( !( *ait).isMember("connector") || ( *ait)["connector"].asString() == "" || conn != conname){
continue;
}
std::string tmppath = Util::getMyPath() + std::string("MistConn") + ( *ait)["connector"].asString();
argarr[0] = new char[ tmppath.size() + 1]; std::strncpy(argarr[0], tmppath.c_str(), tmppath.size() + 1);
argarr[1] = new char[3]; std::strncpy(argarr[1], "-n\0", 3);
if (( *ait).isMember("port") && ( *ait)["port"].asInt() != 0){
arg = ( *ait)["port"].asString();
argarr[argnum] = new char[3]; std::strncpy(argarr[argnum], "-p\0", 3); argnum++;
argarr[argnum] = new char[arg.size() + 1]; std::strncpy(argarr[argnum], arg.c_str(), arg.size() + 1); argnum++;
}
if (( *ait).isMember("interface") && ( *ait)["interface"].asString() != "" && ( *ait)["interface"].asString() != "0.0.0.0"){
arg = ( *ait)["interface"].asString();
argarr[argnum] = new char[3]; std::strncpy(argarr[argnum], "-i\0", 3); argnum++;
argarr[argnum] = new char[arg.size() + 1]; std::strncpy(argarr[argnum], arg.c_str(), arg.size() + 1 ); argnum++;
}
if (( *ait).isMember("username") && ( *ait)["username"].asString() != "" && ( *ait)["username"].asString() != "root"){
arg = ( *ait)["username"].asString();
argarr[argnum] = new char[3]; std::strncpy(argarr[argnum], "-u\0", 3); argnum++;
argarr[argnum] = new char[arg.size() + 1]; std::strncpy(argarr[argnum], arg.c_str(), arg.size() + 1); argnum++;
}
if (( *ait).isMember("tracks") && ( *ait)["tracks"].asString() != ""){
arg = ( *ait)["tracks"].asString();
argarr[argnum] = new char[3]; std::strncpy(argarr[argnum], "-t\0", 3); argnum++;
argarr[argnum] = new char[arg.size() + 1]; std::strncpy(argarr[argnum], arg.c_str(), arg.size() + 1); argnum++;
}
if (( *ait).isMember("args") && ( *ait)["args"].asString() != ""){
arg = ( *ait)["args"].asString();
argarr[argnum] = new char[arg.size() + 1]; std::strncpy(argarr[argnum], arg.c_str(), arg.size() + 1); argnum++;
}
}
argarr[argnum] = NULL;
}
///\brief Checks current protocol coguration, updates state of enabled connectors if neccesary.
///\param p An object containing all protocols.
void CheckProtocols(JSON::Value & p){
std::map<std::string, std::string> new_connectors;
@ -40,6 +102,13 @@ namespace Controller {
bool haveHTTPgeneric = false;
bool haveHTTPspecific = false;
// used for building args
int zero = 0;
int out = fileno(stdout);
int err = fileno(stderr);
char * argarr[15]; // approx max # of args (with a wide margin)
int i;
std::string tmp;
JSON::Value counter = (long long int)0;
@ -48,7 +117,9 @@ namespace Controller {
continue;
}
tmp = std::string("MistConn") + ( *ait)["connector"].asString() + std::string(" -n");
tmp = std::string("MistConn") + ( *ait)["connector"].asString();
tmp += std::string(" -n");
if (( *ait)["connector"].asString() == "HTTP"){
haveHTTPgeneric = true;
}
@ -62,12 +133,17 @@ namespace Controller {
if (( *ait).isMember("interface") && ( *ait)["interface"].asString() != "" && ( *ait)["interface"].asString() != "0.0.0.0"){
tmp += std::string(" -i ") + ( *ait)["interface"].asString();
}
if (( *ait).isMember("username") && ( *ait)["username"].asString() != "" && ( *ait)["username"].asString() != "root"){
tmp += std::string(" -u ") + ( *ait)["username"].asString();
}
if (( *ait).isMember("tracks") && ( *ait)["tracks"].asString() != ""){
tmp += std::string(" -t \"") + ( *ait)["tracks"].asString() + "\"";
}
if (( *ait).isMember("args") && ( *ait)["args"].asString() != ""){
tmp += std::string(" ") + ( *ait)["args"].asString();
}
@ -93,7 +169,19 @@ namespace Controller {
for (iter = new_connectors.begin(); iter != new_connectors.end(); iter++){
if (currentConnectors.count(iter->first) != 1 || currentConnectors[iter->first] != iter->second || !Util::Procs::isActive(iter->first)){
Log("CONF", "Starting connector: " + iter->second);
Util::Procs::Start(iter->first, Util::getMyPath() + iter->second);
// clear out old args
for (i=0;i<15;i++)
{
argarr[i] = NULL;
}
// get args for this connector
buildPipedArguments(p, iter->second, (char **)&argarr);
// start piped w/ generated args
Util::Procs::StartPiped(iter->first, argarr, &zero, &out, &err);
}
}