From 88749d2259fcdab359b88e78587ab5a0f1df15bf Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sun, 15 Jan 2017 23:41:28 +0100 Subject: [PATCH 1/2] Added sourceURI to stream metadata structures --- lib/dtsc.h | 1 + lib/dtscmeta.cpp | 19 +++++++++++++++++++ src/input/input.cpp | 5 +---- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/lib/dtsc.h b/lib/dtsc.h index e94bfa7a..89524610 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -338,6 +338,7 @@ namespace DTSC { uint16_t version; long long int moreheader; long long int bufferWindow; + std::string sourceURI; }; /// An iterator helper for easily iterating over the parts in a Fragment. diff --git a/lib/dtscmeta.cpp b/lib/dtscmeta.cpp index c4a5bff2..143455f4 100644 --- a/lib/dtscmeta.cpp +++ b/lib/dtscmeta.cpp @@ -1311,6 +1311,7 @@ namespace DTSC { merged = source.getFlag("merged"); bufferWindow = source.getInt("buffer_window"); moreheader = source.getInt("moreheader"); + source.getString("source", sourceURI); Scan tmpTracks = source.getScan().getMember("tracks"); unsigned int num = 0; Scan tmpTrack; @@ -1330,6 +1331,7 @@ namespace DTSC { Meta::Meta(JSON::Value & meta) { vod = meta.isMember("vod") && meta["vod"]; live = meta.isMember("live") && meta["live"]; + sourceURI = meta.isMember("source") ? meta["source"].asStringRef() : ""; version = meta.isMember("version") ? meta["version"].asInt() : 0; merged = meta.isMember("merged") && meta["merged"]; bufferWindow = 0; @@ -1741,6 +1743,7 @@ namespace DTSC { } } if (version){dataLen += 18;} + if (sourceURI.size()){dataLen += 13+sourceURI.size();} return dataLen + 8; //add 8 bytes header } @@ -1770,6 +1773,11 @@ namespace DTSC { writePointer(p, "\000\007version\001", 10); writePointer(p, convertLongLong(version), 8); } + if (sourceURI.size()) { + writePointer(p, "\000\006source\002", 9); + writePointer(p, convertInt(sourceURI.size()), 4); + writePointer(p, sourceURI); + } if (bufferWindow) { writePointer(p, "\000\015buffer_window\001", 16); writePointer(p, convertLongLong(bufferWindow), 8); @@ -1807,6 +1815,11 @@ namespace DTSC { conn.SendNow("\000\007version\001", 10); conn.SendNow(convertLongLong(version), 8); } + if (sourceURI.size()) { + conn.SendNow("\000\006source\002", 9); + conn.SendNow(convertInt(sourceURI.size()), 4); + conn.SendNow(sourceURI); + } if (bufferWindow) { conn.SendNow("\000\015buffer_window\001", 16); conn.SendNow(convertLongLong(bufferWindow), 8); @@ -1894,6 +1907,9 @@ namespace DTSC { if (version) { result["version"] = (long long)version; } + if (sourceURI.size()){ + result["source"] = sourceURI; + } result["moreheader"] = moreheader; return result; } @@ -1925,6 +1941,9 @@ namespace DTSC { if (bufferWindow) { str << std::string(indent, ' ') << "Buffer Window: " << bufferWindow << std::endl; } + if (sourceURI.size()){ + str << std::string(indent, ' ') << "Source: " << sourceURI << std::endl; + } str << std::string(indent, ' ') << "More Header: " << moreheader << std::endl; } diff --git a/src/input/input.cpp b/src/input/input.cpp index ccd3e5dc..cb046198 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -157,10 +157,6 @@ namespace Mist { return 0; } - if (streamName != "") { - config->getOption("streamname") = streamName; - } - streamName = config->getString("streamname"); nProxy.streamName = streamName; if (!setup()){ @@ -173,6 +169,7 @@ namespace Mist { std::cerr << "Reading header for " << config->getString("input") << " failed." << std::endl; return 0; } + myMeta.sourceURI = config->getString("input"); parseHeader(); MEDIUM_MSG("Header parsed, %lu tracks", myMeta.tracks.size()); From 9a783a782d47a01696fd297db6ffa2f8af03b035 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 28 Feb 2017 14:05:37 +0100 Subject: [PATCH 2/2] Unified all push-in-enabled outputs into a single style/function of accepting incoming pushes --- src/input/input.cpp | 2 +- src/output/output.cpp | 31 +++++++++++++++++++++++++++++++ src/output/output.h | 1 + src/output/output_rtmp.cpp | 34 ++++++---------------------------- 4 files changed, 39 insertions(+), 29 deletions(-) diff --git a/src/input/input.cpp b/src/input/input.cpp index cb046198..1b7378cb 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -297,7 +297,7 @@ namespace Mist { pullLock.unlink(); return; } - if (!Util::startInput(streamName, "push://")) {//manually override stream url to start the buffer + if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"))) {//manually override stream url to start the buffer pullLock.post(); pullLock.close(); pullLock.unlink(); diff --git a/src/output/output.cpp b/src/output/output.cpp index 2e18d0d2..f267d52b 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -1088,5 +1088,36 @@ namespace Mist{ return true; } + /// Checks if the set streamName allows pushes from this connector/IP/password combination. + /// Runs all appropriate triggers and checks. + /// Returns true if the push should continue, false otherwise. + bool Output::allowPush(const std::string & passwd){ + std::string strmSource; + + // Initialize the stream source if needed, connect to it + initialize(); + //pull the source setting from metadata + strmSource = myMeta.sourceURI; + + if (!strmSource.size()){ + FAIL_MSG("Push rejected - stream %s not configured", streamName.c_str()); + return false; + } + if (strmSource.substr(0, 7) != "push://"){ + FAIL_MSG("Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), strmSource.c_str()); + return false; + } + + std::string source = strmSource.substr(7); + std::string IP = source.substr(0, source.find('@')); + if (IP != ""){ + if (!myConn.isAddress(IP)){ + FAIL_MSG("Push from %s to %s rejected - source host not whitelisted", getConnectedHost().c_str(), streamName.c_str()); + return false; + } + } + return true; + } + } diff --git a/src/output/output.h b/src/output/output.h index 518f868e..ec7883df 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -111,6 +111,7 @@ namespace Mist { bool sentHeader;///< If false, triggers sendHeader if parseData is true. std::map bookKeeping; + bool allowPush(const std::string & passwd); }; } diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index e9135864..de2cf5c3 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -586,35 +586,13 @@ namespace Mist { } Util::sanitizeName(streamName); - //pull the server configuration - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); ///< Contains server configuration and capabilities - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - - DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(streamName); - if (streamCfg){ - if (streamCfg.getMember("source").asString().substr(0, 7) != "push://"){ - FAIL_MSG("Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), streamCfg.getMember("source").asString().c_str()); - onFinish(); - }else{ - std::string source = streamCfg.getMember("source").asString().substr(7); - std::string IP = source.substr(0, source.find('@')); - if (IP != ""){ - if (!myConn.isAddress(IP)){ - FAIL_MSG("Push from %s to %s rejected - source host not whitelisted", getConnectedHost().c_str(), streamName.c_str()); - onFinish(); - } - } - } - }else{ - FAIL_MSG("Push from %s rejected - stream '%s' not configured.", getConnectedHost().c_str(), streamName.c_str()); - onFinish(); - } - configLock.post(); - configLock.close(); - if (!myConn){return;}//do not initialize if rejected + isPushing = true; - initialize(); + if (!allowPush("")){ + isPushing = false; + onFinish(); + return; + } } //send a _result reply AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);