Merge branch 'development' into LTS_development
# Conflicts: # src/output/output.h # src/output/output_rtmp.cpp
This commit is contained in:
commit
daa0833970
6 changed files with 60 additions and 61 deletions
|
@ -365,6 +365,7 @@ namespace DTSC {
|
|||
uint16_t version;
|
||||
long long int moreheader;
|
||||
long long int bufferWindow;
|
||||
std::string sourceURI;
|
||||
};
|
||||
|
||||
/// An iterator helper for easily iterating over the parts in a Fragment.
|
||||
|
|
|
@ -1365,6 +1365,7 @@ namespace DTSC {
|
|||
merged = source.getFlag("merged");
|
||||
bufferWindow = source.getInt("buffer_window");
|
||||
moreheader = source.getInt("moreheader");
|
||||
source.getString("source", sourceURI);
|
||||
Scan tmpTracks = source.getScan().getMember("tracks");
|
||||
unsigned int num = 0;
|
||||
Scan tmpTrack;
|
||||
|
@ -1384,6 +1385,7 @@ namespace DTSC {
|
|||
Meta::Meta(JSON::Value & meta) {
|
||||
vod = meta.isMember("vod") && meta["vod"];
|
||||
live = meta.isMember("live") && meta["live"];
|
||||
sourceURI = meta.isMember("source") ? meta["source"].asStringRef() : "";
|
||||
version = meta.isMember("version") ? meta["version"].asInt() : 0;
|
||||
merged = meta.isMember("merged") && meta["merged"];
|
||||
bufferWindow = 0;
|
||||
|
@ -1826,6 +1828,7 @@ namespace DTSC {
|
|||
}
|
||||
}
|
||||
if (version){dataLen += 18;}
|
||||
if (sourceURI.size()){dataLen += 13+sourceURI.size();}
|
||||
return dataLen + 8; //add 8 bytes header
|
||||
}
|
||||
|
||||
|
@ -1855,6 +1858,11 @@ namespace DTSC {
|
|||
writePointer(p, "\000\007version\001", 10);
|
||||
writePointer(p, convertLongLong(version), 8);
|
||||
}
|
||||
if (sourceURI.size()) {
|
||||
writePointer(p, "\000\006source\002", 9);
|
||||
writePointer(p, convertInt(sourceURI.size()), 4);
|
||||
writePointer(p, sourceURI);
|
||||
}
|
||||
if (bufferWindow) {
|
||||
writePointer(p, "\000\015buffer_window\001", 16);
|
||||
writePointer(p, convertLongLong(bufferWindow), 8);
|
||||
|
@ -1892,6 +1900,11 @@ namespace DTSC {
|
|||
conn.SendNow("\000\007version\001", 10);
|
||||
conn.SendNow(convertLongLong(version), 8);
|
||||
}
|
||||
if (sourceURI.size()) {
|
||||
conn.SendNow("\000\006source\002", 9);
|
||||
conn.SendNow(convertInt(sourceURI.size()), 4);
|
||||
conn.SendNow(sourceURI);
|
||||
}
|
||||
if (bufferWindow) {
|
||||
conn.SendNow("\000\015buffer_window\001", 16);
|
||||
conn.SendNow(convertLongLong(bufferWindow), 8);
|
||||
|
@ -1987,6 +2000,9 @@ namespace DTSC {
|
|||
if (version) {
|
||||
result["version"] = (long long)version;
|
||||
}
|
||||
if (sourceURI.size()){
|
||||
result["source"] = sourceURI;
|
||||
}
|
||||
result["moreheader"] = moreheader;
|
||||
return result;
|
||||
}
|
||||
|
@ -2018,6 +2034,9 @@ namespace DTSC {
|
|||
if (bufferWindow) {
|
||||
str << std::string(indent, ' ') << "Buffer Window: " << bufferWindow << std::endl;
|
||||
}
|
||||
if (sourceURI.size()){
|
||||
str << std::string(indent, ' ') << "Source: " << sourceURI << std::endl;
|
||||
}
|
||||
str << std::string(indent, ' ') << "More Header: " << moreheader << std::endl;
|
||||
}
|
||||
|
||||
|
|
|
@ -173,10 +173,6 @@ namespace Mist {
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (streamName != "") {
|
||||
config->getOption("streamname") = streamName;
|
||||
}
|
||||
streamName = config->getString("streamname");
|
||||
nProxy.streamName = streamName;
|
||||
|
||||
if (!setup()) {
|
||||
|
@ -189,6 +185,7 @@ namespace Mist {
|
|||
std::cerr << "Reading header for " << config->getString("input") << " failed." << std::endl;
|
||||
return 0;
|
||||
}
|
||||
myMeta.sourceURI = config->getString("input");
|
||||
parseHeader();
|
||||
MEDIUM_MSG("Header parsed, %lu tracks", myMeta.tracks.size());
|
||||
|
||||
|
@ -357,7 +354,7 @@ namespace Mist {
|
|||
pullLock.unlink();
|
||||
return;
|
||||
}
|
||||
if (!Util::startInput(streamName, "push://")) {//manually override stream url to start the buffer
|
||||
if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"))) {//manually override stream url to start the buffer
|
||||
pullLock.post();
|
||||
pullLock.close();
|
||||
pullLock.unlink();
|
||||
|
|
|
@ -1272,5 +1272,36 @@ namespace Mist{
|
|||
return true;
|
||||
}
|
||||
|
||||
/// Checks if the set streamName allows pushes from this connector/IP/password combination.
|
||||
/// Runs all appropriate triggers and checks.
|
||||
/// Returns true if the push should continue, false otherwise.
|
||||
bool Output::allowPush(const std::string & passwd){
|
||||
std::string strmSource;
|
||||
|
||||
// Initialize the stream source if needed, connect to it
|
||||
initialize();
|
||||
//pull the source setting from metadata
|
||||
strmSource = myMeta.sourceURI;
|
||||
|
||||
if (!strmSource.size()){
|
||||
FAIL_MSG("Push rejected - stream %s not configured", streamName.c_str());
|
||||
return false;
|
||||
}
|
||||
if (strmSource.substr(0, 7) != "push://"){
|
||||
FAIL_MSG("Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), strmSource.c_str());
|
||||
return false;
|
||||
}
|
||||
|
||||
std::string source = strmSource.substr(7);
|
||||
std::string IP = source.substr(0, source.find('@'));
|
||||
if (IP != ""){
|
||||
if (!myConn.isAddress(IP)){
|
||||
FAIL_MSG("Push from %s to %s rejected - source host not whitelisted", getConnectedHost().c_str(), streamName.c_str());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -127,6 +127,7 @@ namespace Mist {
|
|||
|
||||
std::map<int,DTSCPageData> bookKeeping;
|
||||
virtual bool isRecording(){return false;};
|
||||
bool allowPush(const std::string & passwd);
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -777,63 +777,13 @@ namespace Mist {
|
|||
}
|
||||
|
||||
Util::sanitizeName(streamName);
|
||||
//pull the server configuration
|
||||
std::string smp = streamName.substr(0,(streamName.find_first_of("+ ")));
|
||||
IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); ///< Contains server configuration and capabilities
|
||||
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
|
||||
configLock.wait();
|
||||
|
||||
DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(smp);
|
||||
if (streamCfg){
|
||||
if (streamCfg.getMember("source").asString().substr(0, 7) != "push://"){
|
||||
FAIL_MSG("Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), streamCfg.getMember("source").asString().c_str());
|
||||
onFinish();
|
||||
}else{
|
||||
std::string source = streamCfg.getMember("source").asString().substr(7);
|
||||
std::string IP = source.substr(0, source.find('@'));
|
||||
/*LTS-START*/
|
||||
std::string password;
|
||||
if (source.find('@') != std::string::npos){
|
||||
password = source.substr(source.find('@')+1);
|
||||
if (password != ""){
|
||||
if (password == app_name){
|
||||
INFO_MSG("Password accepted - ignoring IP settings.");
|
||||
IP = "";
|
||||
}else{
|
||||
INFO_MSG("Password rejected - checking IP.");
|
||||
if (IP == ""){
|
||||
IP = "deny-all.invalid";
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if(Triggers::shouldTrigger("STREAM_PUSH", smp)){
|
||||
std::string payload = streamName+"\n" + getConnectedHost() +"\n"+capa["name"].asStringRef()+"\n"+reqUrl;
|
||||
if (!Triggers::doTrigger("STREAM_PUSH", payload, smp)){
|
||||
FAIL_MSG("Push from %s to %s rejected - STREAM_PUSH trigger denied the push", getConnectedHost().c_str(), streamName.c_str());
|
||||
onFinish();
|
||||
configLock.post();
|
||||
configLock.close();
|
||||
return;
|
||||
}
|
||||
}
|
||||
/*LTS-END*/
|
||||
if (IP != ""){
|
||||
if (!myConn.isAddress(IP)){
|
||||
FAIL_MSG("Push from %s to %s rejected - source host not whitelisted", getConnectedHost().c_str(), streamName.c_str());
|
||||
onFinish();
|
||||
}
|
||||
}
|
||||
}
|
||||
}else{
|
||||
FAIL_MSG("Push from %s rejected - stream '%s' not configured.", getConnectedHost().c_str(), streamName.c_str());
|
||||
onFinish();
|
||||
}
|
||||
configLock.post();
|
||||
configLock.close();
|
||||
if (!myConn){return;}//do not initialize if rejected
|
||||
|
||||
isPushing = true;
|
||||
initialize();
|
||||
if (!allowPush("")){
|
||||
isPushing = false;
|
||||
onFinish();
|
||||
return;
|
||||
}
|
||||
}
|
||||
//send a _result reply
|
||||
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
||||
|
|
Loading…
Add table
Reference in a new issue