diff --git a/lib/dtsc.h b/lib/dtsc.h index 2a401e1d..318f15b2 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -365,6 +365,7 @@ namespace DTSC { uint16_t version; long long int moreheader; long long int bufferWindow; + std::string sourceURI; }; /// An iterator helper for easily iterating over the parts in a Fragment. diff --git a/lib/dtscmeta.cpp b/lib/dtscmeta.cpp index c3dfa4da..89f3645c 100644 --- a/lib/dtscmeta.cpp +++ b/lib/dtscmeta.cpp @@ -1365,6 +1365,7 @@ namespace DTSC { merged = source.getFlag("merged"); bufferWindow = source.getInt("buffer_window"); moreheader = source.getInt("moreheader"); + source.getString("source", sourceURI); Scan tmpTracks = source.getScan().getMember("tracks"); unsigned int num = 0; Scan tmpTrack; @@ -1384,6 +1385,7 @@ namespace DTSC { Meta::Meta(JSON::Value & meta) { vod = meta.isMember("vod") && meta["vod"]; live = meta.isMember("live") && meta["live"]; + sourceURI = meta.isMember("source") ? meta["source"].asStringRef() : ""; version = meta.isMember("version") ? meta["version"].asInt() : 0; merged = meta.isMember("merged") && meta["merged"]; bufferWindow = 0; @@ -1826,6 +1828,7 @@ namespace DTSC { } } if (version){dataLen += 18;} + if (sourceURI.size()){dataLen += 13+sourceURI.size();} return dataLen + 8; //add 8 bytes header } @@ -1855,6 +1858,11 @@ namespace DTSC { writePointer(p, "\000\007version\001", 10); writePointer(p, convertLongLong(version), 8); } + if (sourceURI.size()) { + writePointer(p, "\000\006source\002", 9); + writePointer(p, convertInt(sourceURI.size()), 4); + writePointer(p, sourceURI); + } if (bufferWindow) { writePointer(p, "\000\015buffer_window\001", 16); writePointer(p, convertLongLong(bufferWindow), 8); @@ -1892,6 +1900,11 @@ namespace DTSC { conn.SendNow("\000\007version\001", 10); conn.SendNow(convertLongLong(version), 8); } + if (sourceURI.size()) { + conn.SendNow("\000\006source\002", 9); + conn.SendNow(convertInt(sourceURI.size()), 4); + conn.SendNow(sourceURI); + } if (bufferWindow) { conn.SendNow("\000\015buffer_window\001", 16); conn.SendNow(convertLongLong(bufferWindow), 8); @@ -1987,6 +2000,9 @@ namespace DTSC { if (version) { result["version"] = (long long)version; } + if (sourceURI.size()){ + result["source"] = sourceURI; + } result["moreheader"] = moreheader; return result; } @@ -2018,6 +2034,9 @@ namespace DTSC { if (bufferWindow) { str << std::string(indent, ' ') << "Buffer Window: " << bufferWindow << std::endl; } + if (sourceURI.size()){ + str << std::string(indent, ' ') << "Source: " << sourceURI << std::endl; + } str << std::string(indent, ' ') << "More Header: " << moreheader << std::endl; } diff --git a/src/input/input.cpp b/src/input/input.cpp index af2919b8..33890130 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -173,10 +173,6 @@ namespace Mist { return 0; } - if (streamName != "") { - config->getOption("streamname") = streamName; - } - streamName = config->getString("streamname"); nProxy.streamName = streamName; if (!setup()) { @@ -189,6 +185,7 @@ namespace Mist { std::cerr << "Reading header for " << config->getString("input") << " failed." << std::endl; return 0; } + myMeta.sourceURI = config->getString("input"); parseHeader(); MEDIUM_MSG("Header parsed, %lu tracks", myMeta.tracks.size()); @@ -357,7 +354,7 @@ namespace Mist { pullLock.unlink(); return; } - if (!Util::startInput(streamName, "push://")) {//manually override stream url to start the buffer + if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"))) {//manually override stream url to start the buffer pullLock.post(); pullLock.close(); pullLock.unlink(); diff --git a/src/output/output.cpp b/src/output/output.cpp index c7e5c1d5..ae374bb5 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -1272,5 +1272,36 @@ namespace Mist{ return true; } + /// Checks if the set streamName allows pushes from this connector/IP/password combination. + /// Runs all appropriate triggers and checks. + /// Returns true if the push should continue, false otherwise. + bool Output::allowPush(const std::string & passwd){ + std::string strmSource; + + // Initialize the stream source if needed, connect to it + initialize(); + //pull the source setting from metadata + strmSource = myMeta.sourceURI; + + if (!strmSource.size()){ + FAIL_MSG("Push rejected - stream %s not configured", streamName.c_str()); + return false; + } + if (strmSource.substr(0, 7) != "push://"){ + FAIL_MSG("Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), strmSource.c_str()); + return false; + } + + std::string source = strmSource.substr(7); + std::string IP = source.substr(0, source.find('@')); + if (IP != ""){ + if (!myConn.isAddress(IP)){ + FAIL_MSG("Push from %s to %s rejected - source host not whitelisted", getConnectedHost().c_str(), streamName.c_str()); + return false; + } + } + return true; + } + } diff --git a/src/output/output.h b/src/output/output.h index 0a730acf..5267a418 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -127,6 +127,7 @@ namespace Mist { std::map bookKeeping; virtual bool isRecording(){return false;}; + bool allowPush(const std::string & passwd); }; } diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index 8adee9cf..50b1a162 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -777,63 +777,13 @@ namespace Mist { } Util::sanitizeName(streamName); - //pull the server configuration - std::string smp = streamName.substr(0,(streamName.find_first_of("+ "))); - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); ///< Contains server configuration and capabilities - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - - DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(smp); - if (streamCfg){ - if (streamCfg.getMember("source").asString().substr(0, 7) != "push://"){ - FAIL_MSG("Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), streamCfg.getMember("source").asString().c_str()); - onFinish(); - }else{ - std::string source = streamCfg.getMember("source").asString().substr(7); - std::string IP = source.substr(0, source.find('@')); - /*LTS-START*/ - std::string password; - if (source.find('@') != std::string::npos){ - password = source.substr(source.find('@')+1); - if (password != ""){ - if (password == app_name){ - INFO_MSG("Password accepted - ignoring IP settings."); - IP = ""; - }else{ - INFO_MSG("Password rejected - checking IP."); - if (IP == ""){ - IP = "deny-all.invalid"; - } - } - } - } - if(Triggers::shouldTrigger("STREAM_PUSH", smp)){ - std::string payload = streamName+"\n" + getConnectedHost() +"\n"+capa["name"].asStringRef()+"\n"+reqUrl; - if (!Triggers::doTrigger("STREAM_PUSH", payload, smp)){ - FAIL_MSG("Push from %s to %s rejected - STREAM_PUSH trigger denied the push", getConnectedHost().c_str(), streamName.c_str()); - onFinish(); - configLock.post(); - configLock.close(); - return; - } - } - /*LTS-END*/ - if (IP != ""){ - if (!myConn.isAddress(IP)){ - FAIL_MSG("Push from %s to %s rejected - source host not whitelisted", getConnectedHost().c_str(), streamName.c_str()); - onFinish(); - } - } - } - }else{ - FAIL_MSG("Push from %s rejected - stream '%s' not configured.", getConnectedHost().c_str(), streamName.c_str()); - onFinish(); - } - configLock.post(); - configLock.close(); - if (!myConn){return;}//do not initialize if rejected + isPushing = true; - initialize(); + if (!allowPush("")){ + isPushing = false; + onFinish(); + return; + } } //send a _result reply AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);