RTMP rate-limiting implemented, RTMP push now supports leaving off the target stream name.

This commit is contained in:
Thulinma 2016-05-16 15:17:03 +02:00
parent 47bfebb339
commit 219e326048
2 changed files with 27 additions and 4 deletions

View file

@ -10,11 +10,12 @@
namespace Mist { namespace Mist {
OutRTMP::OutRTMP(Socket::Connection & conn) : Output(conn) { OutRTMP::OutRTMP(Socket::Connection & conn) : Output(conn) {
maxbps = config->getInteger("maxkbps")*128;
if (config->getString("target").size() && config->getString("target").substr(0, 7) == "rtmp://"){ if (config->getString("target").size() && config->getString("target").substr(0, 7) == "rtmp://"){
streamName = config->getString("streamname"); streamName = config->getString("streamname");
std::string pushStr= config->getString("target"); std::string pushStr= config->getString("target");
pushStr = pushStr.substr(7); pushStr = pushStr.substr(7);
std::string host, app = "default", streamOut = "default"; std::string host, app = "default", streamOut = streamName;
int port = 1935; int port = 1935;
size_t slash = pushStr.find('/'); size_t slash = pushStr.find('/');
@ -35,8 +36,9 @@ namespace Mist {
if (slash != std::string::npos){ if (slash != std::string::npos){
streamOut = app.substr(slash+1, std::string::npos); streamOut = app.substr(slash+1, std::string::npos);
app = app.substr(0, slash); app = app.substr(0, slash);
}else{ if (!streamOut.size()){
streamOut = app; streamOut = streamName;
}
} }
INFO_MSG("About to push stream %s out. Host: %s, port: %d, app: %s, stream: %s", streamName.c_str(), host.c_str(), port, app.c_str(), streamOut.c_str()); INFO_MSG("About to push stream %s out. Host: %s, port: %d, app: %s, stream: %s", streamName.c_str(), host.c_str(), port, app.c_str(), streamOut.c_str());
@ -232,7 +234,7 @@ namespace Mist {
void OutRTMP::init(Util::Config * cfg) { void OutRTMP::init(Util::Config * cfg) {
Output::init(cfg); Output::init(cfg);
capa["name"] = "RTMP"; capa["name"] = "RTMP";
capa["desc"] = "Enables the RTMP protocol which is used by Adobe Flash Player."; capa["desc"] = "Enables ingest and output over Adobe's RTMP protocol.";
capa["deps"] = ""; capa["deps"] = "";
capa["url_rel"] = "/play/$"; capa["url_rel"] = "/play/$";
capa["codecs"][0u][0u].append("H264"); capa["codecs"][0u][0u].append("H264");
@ -254,6 +256,12 @@ namespace Mist {
capa["methods"][0u]["type"] = "flash/10"; capa["methods"][0u]["type"] = "flash/10";
capa["methods"][0u]["priority"] = 6ll; capa["methods"][0u]["priority"] = 6ll;
capa["methods"][0u]["player_url"] = "/flashplayer.swf"; capa["methods"][0u]["player_url"] = "/flashplayer.swf";
capa["optional"]["maxkbps"]["name"] = "Max. kbps";
capa["optional"]["maxkbps"]["help"] = "Maximum bitrate to allow in the ingest direction, in kilobits per second.";
capa["optional"]["maxkbps"]["option"] = "--maxkbps";
capa["optional"]["maxkbps"]["short"] = "K";
capa["optional"]["maxkbps"]["default"] = (long long)20000;
capa["optional"]["maxkbps"]["type"] = "uint";
cfg->addConnectorOptions(1935, capa); cfg->addConnectorOptions(1935, capa);
config = cfg; config = cfg;
capa["push_urls"].append("rtmp://*"); capa["push_urls"].append("rtmp://*");
@ -450,6 +458,19 @@ namespace Mist {
sentHeader = true; sentHeader = true;
} }
void OutRTMP::requestHandler(){
//If needed, slow down the reading to a rate of maxbps on average
static bool slowWarned = false;
while (maxbps && (Util::epoch() - myConn.connTime()) && myConn.dataDown() / (Util::epoch() - myConn.connTime()) > maxbps){
if (!slowWarned){
WARN_MSG("Slowing down connection from %s because rate of %llukbps > %llukbps", getConnectedHost().c_str(), (myConn.dataDown() / (Util::epoch() - myConn.connTime())) / 128, maxbps / 128);
slowWarned = true;
}
Util::sleep(250);
}
Output::requestHandler();
}
void OutRTMP::onRequest() { void OutRTMP::onRequest() {
parseChunk(myConn.Received()); parseChunk(myConn.Received());
} }

View file

@ -16,8 +16,10 @@ namespace Mist {
void sendHeader(); void sendHeader();
bool isReadyForPlay(); bool isReadyForPlay();
static bool listenMode(); static bool listenMode();
void requestHandler();
protected: protected:
bool isPushing; bool isPushing;
unsigned int maxbps;
void parseVars(std::string data); void parseVars(std::string data);
std::string app_name; std::string app_name;
void parseChunk(Socket::Buffer & inputBuffer); void parseChunk(Socket::Buffer & inputBuffer);