Implemented RTMP push output.

This commit is contained in:
Thulinma 2016-05-10 14:30:39 +02:00
parent 8ddfb000df
commit fcdf788596
4 changed files with 189 additions and 102 deletions

View file

@ -10,28 +10,138 @@
namespace Mist {
OutRTMP::OutRTMP(Socket::Connection & conn) : Output(conn) {
setBlocking(true);
while (!conn.Received().available(1537) && conn.connected()) {
conn.spool();
}
if (!conn){
return;
}
RTMPStream::handshake_in.append(conn.Received().remove(1537));
RTMPStream::rec_cnt += 1537;
if (config->getString("target").size() && config->getString("target").substr(0, 7) == "rtmp://"){
streamName = config->getString("streamname");
std::string pushStr= config->getString("target");
pushStr = pushStr.substr(7);
std::string host, app = "default", streamOut = "default";
int port = 1935;
if (RTMPStream::doHandshake()) {
conn.SendNow(RTMPStream::handshake_out);
while (!conn.Received().available(1536) && conn.connected()) {
size_t slash = pushStr.find('/');
if (slash != std::string::npos){
app = pushStr.substr(slash+1, std::string::npos);
host = pushStr.substr(0, slash);
}else{
host = pushStr;
}
size_t colon = host.find(':');
if (colon != std::string::npos && colon != 0 && colon != host.size()) {
port = atoi(host.substr(colon + 1, std::string::npos).c_str());
host = host.substr(0, colon);
}
slash = app.find('/');
if (slash != std::string::npos){
streamOut = app.substr(slash+1, std::string::npos);
app = app.substr(0, slash);
}else{
streamOut = app;
}
INFO_MSG("About to push stream %s out. Host: %s, port: %d, app: %s, stream: %s", streamName.c_str(), host.c_str(), port, app.c_str(), streamOut.c_str());
myConn = Socket::Connection(host, port, false);
if (!myConn){
FAIL_MSG("Could not connect to %s:%d!", host.c_str(), port);
return;
}
//do handshake
myConn.SendNow("\003", 1);//protocol version. Always 3
char * temp = (char*)malloc(3072);
if (!temp){
myConn.close();
return;
}
*((uint32_t *)temp) = 0; //time zero
*(((uint32_t *)(temp + 4))) = htonl(0x01020304); //version 1 2 3 4
for (int i = 8; i < 3072; ++i) {
temp[i] = FILLER_DATA[i % sizeof(FILLER_DATA)];
} //"random" data
myConn.SendNow(temp, 3072);
free(temp);
setBlocking(true);
while (!myConn.Received().available(3073) && myConn.connected() && config->is_active) {
myConn.spool();
}
if (!myConn || !config->is_active){return;}
conn.Received().remove(3073);
RTMPStream::rec_cnt += 3073;
RTMPStream::snd_cnt += 3073;
setBlocking(false);
INFO_MSG("Push out handshake completed");
{
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
amfReply.addContent(AMF::Object("", "connect")); //command
amfReply.addContent(AMF::Object("", (double)1)); //transaction ID
amfReply.addContent(AMF::Object("")); //options
amfReply.getContentP(2)->addContent(AMF::Object("app", app));
amfReply.getContentP(2)->addContent(AMF::Object("type", "nonprivate"));
amfReply.getContentP(2)->addContent(AMF::Object("flashVer", "FMLE/3.0 (compatible; MistServer/" PACKAGE_VERSION "/" RELEASE ")"));
amfReply.getContentP(2)->addContent(AMF::Object("tcUrl", "rtmp://" + host + "/" + app));
sendCommand(amfReply, 20, 0);
}
RTMPStream::chunk_snd_max = 4096;
myConn.SendNow(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max)); //send chunk size max (msg 1)
{
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
amfReply.addContent(AMF::Object("", "releaseStream")); //command
amfReply.addContent(AMF::Object("", (double)2)); //transaction ID
amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //options
amfReply.addContent(AMF::Object("", streamOut)); //stream name
sendCommand(amfReply, 20, 0);
}
{
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
amfReply.addContent(AMF::Object("", "FCPublish")); //command
amfReply.addContent(AMF::Object("", (double)3)); //transaction ID
amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //options
amfReply.addContent(AMF::Object("", streamOut)); //stream name
sendCommand(amfReply, 20, 0);
}
{
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
amfReply.addContent(AMF::Object("", "createStream")); //command
amfReply.addContent(AMF::Object("", (double)4)); //transaction ID
amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //options
sendCommand(amfReply, 20, 0);
}
{
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
amfReply.addContent(AMF::Object("", "publish")); //command
amfReply.addContent(AMF::Object("", (double)5)); //transaction ID
amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //options
amfReply.addContent(AMF::Object("", streamOut)); //stream name
amfReply.addContent(AMF::Object("", "live")); //stream name
sendCommand(amfReply, 20, 1);
}
INFO_MSG("Publish starting");
parseData = true;
}else{
setBlocking(true);
while (!conn.Received().available(1537) && conn.connected() && config->is_active) {
conn.spool();
}
conn.Received().remove(1536);
RTMPStream::rec_cnt += 1536;
DEBUG_MSG(DLVL_HIGH, "Handshake success!");
} else {
DEBUG_MSG(DLVL_DEVEL, "Handshake fail!");
if (!conn || !config->is_active){
return;
}
RTMPStream::handshake_in.append(conn.Received().remove(1537));
RTMPStream::rec_cnt += 1537;
if (RTMPStream::doHandshake()) {
conn.SendNow(RTMPStream::handshake_out);
while (!conn.Received().available(1536) && conn.connected() && config->is_active) {
conn.spool();
}
conn.Received().remove(1536);
RTMPStream::rec_cnt += 1536;
HIGH_MSG("Handshake success!");
} else {
MEDIUM_MSG("Handshake fail!");
}
setBlocking(false);
}
setBlocking(false);
maxSkipAhead = 1500;
minSkipAhead = 500;
isPushing = false;
@ -39,6 +149,10 @@ namespace Mist {
OutRTMP::~OutRTMP() {}
bool OutRTMP::listenMode(){
return !(config->getString("target").size());
}
unsigned int OutRTMP::needsPlayableKeys(){
if (isPushing){
return 0;
@ -128,6 +242,15 @@ namespace Mist {
capa["methods"][0u]["player_url"] = "/flashplayer.swf";
cfg->addConnectorOptions(1935, capa);
config = cfg;
capa["push_urls"].append("rtmp://*");
JSON::Value opt;
opt["arg"] = "string";
opt["default"] = "";
opt["arg_num"] = 1ll;
opt["help"] = "Target rtmp:// URL to push out towards.";
cfg->addOption("target", opt);
cfg->addOption("streamname", JSON::fromString("{\"arg\":\"string\",\"short\":\"s\",\"long\":\"stream\",\"help\":\"The name of the stream to push out, when pushing out.\"}"));
}
void OutRTMP::sendNext() {
@ -322,9 +445,7 @@ namespace Mist {
///\param messageType The type of message.
///\param streamId The ID of the AMF stream.
void OutRTMP::sendCommand(AMF::Object & amfReply, int messageType, int streamId) {
#if DEBUG >= 8
std::cerr << amfReply.Print() << std::endl;
#endif
HIGH_MSG("Sending: %s", amfReply.Print().c_str());
if (messageType == 17) {
myConn.SendNow(RTMPStream::SendChunk(3, messageType, streamId, (char)0 + amfReply.Pack()));
} else {
@ -352,38 +473,13 @@ namespace Mist {
/// connected client host
/// ~~~~~~~~~~~~~~~
void OutRTMP::parseAMFCommand(AMF::Object & amfData, int messageType, int streamId) {
#if DEBUG >= 5
fprintf(stderr, "Received command: %s\n", amfData.Print().c_str());
#endif
#if DEBUG >= 8
fprintf(stderr, "AMF0 command: %s\n", amfData.getContentP(0)->StrValue().c_str());
#endif
MEDIUM_MSG("Received command: %s", amfData.Print().c_str());
HIGH_MSG("AMF0 command: %s", amfData.getContentP(0)->StrValue().c_str());
if (amfData.getContentP(0)->StrValue() == "connect") {
double objencoding = 0;
if (amfData.getContentP(2)->getContentP("objectEncoding")) {
objencoding = amfData.getContentP(2)->getContentP("objectEncoding")->NumValue();
}
#if DEBUG >= 6
int tmpint;
if (amfData.getContentP(2)->getContentP("videoCodecs")) {
tmpint = (int)amfData.getContentP(2)->getContentP("videoCodecs")->NumValue();
if (tmpint & 0x04) {
fprintf(stderr, "Sorensen video support detected\n");
}
if (tmpint & 0x80) {
fprintf(stderr, "H264 video support detected\n");
}
}
if (amfData.getContentP(2)->getContentP("audioCodecs")) {
tmpint = (int)amfData.getContentP(2)->getContentP("audioCodecs")->NumValue();
if (tmpint & 0x04) {
fprintf(stderr, "MP3 audio support detected\n");
}
if (tmpint & 0x400) {
fprintf(stderr, "AAC audio support detected\n");
}
}
#endif
app_name = amfData.getContentP(2)->getContentP("tcUrl")->StrValue();
reqUrl = app_name;//LTS
app_name = app_name.substr(app_name.find('/', 7) + 1);
@ -538,7 +634,7 @@ namespace Mist {
DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(smp);
if (streamCfg){
if (streamCfg.getMember("source").asString().substr(0, 7) != "push://"){
DEBUG_MSG(DLVL_FAIL, "Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), streamCfg.getMember("source").asString().c_str());
FAIL_MSG("Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), streamCfg.getMember("source").asString().c_str());
myConn.close();
}else{
std::string source = streamCfg.getMember("source").asString().substr(7);
@ -549,10 +645,10 @@ namespace Mist {
password = source.substr(source.find('@')+1);
if (password != ""){
if (password == app_name){
DEBUG_MSG(DLVL_DEVEL, "Password accepted - ignoring IP settings.");
INFO_MSG("Password accepted - ignoring IP settings.");
IP = "";
}else{
DEBUG_MSG(DLVL_DEVEL, "Password rejected - checking IP.");
INFO_MSG("Password rejected - checking IP.");
if (IP == ""){
IP = "deny-all.invalid";
}
@ -562,7 +658,7 @@ namespace Mist {
if(Triggers::shouldTrigger("STREAM_PUSH", smp)){
std::string payload = streamName+"\n" + getConnectedHost() +"\n"+capa["name"].asStringRef()+"\n"+reqUrl;
if (!Triggers::doTrigger("STREAM_PUSH", payload, smp)){
DEBUG_MSG(DLVL_FAIL, "Push from %s to %s rejected - STREAM_PUSH trigger denied the push", getConnectedHost().c_str(), streamName.c_str());
FAIL_MSG("Push from %s to %s rejected - STREAM_PUSH trigger denied the push", getConnectedHost().c_str(), streamName.c_str());
myConn.close();
configLock.post();
configLock.close();
@ -572,13 +668,13 @@ namespace Mist {
/*LTS-END*/
if (IP != ""){
if (!myConn.isAddress(IP)){
DEBUG_MSG(DLVL_FAIL, "Push from %s to %s rejected - source host not whitelisted", getConnectedHost().c_str(), streamName.c_str());
FAIL_MSG("Push from %s to %s rejected - source host not whitelisted", getConnectedHost().c_str(), streamName.c_str());
myConn.close();
}
}
}
}else{
DEBUG_MSG(DLVL_FAIL, "Push from %s rejected - stream '%s' not configured.", getConnectedHost().c_str(), streamName.c_str());
FAIL_MSG("Push from %s rejected - stream '%s' not configured.", getConnectedHost().c_str(), streamName.c_str());
myConn.close();
}
configLock.post();
@ -773,16 +869,22 @@ namespace Mist {
}
return;
} //seek
if (amfData.getContentP(0)->StrValue() == "_error") {
WARN_MSG("Received error response: %s", amfData.Print().c_str());
return;
}
if ((amfData.getContentP(0)->StrValue() == "_result") || (amfData.getContentP(0)->StrValue() == "onFCPublish") || (amfData.getContentP(0)->StrValue() == "onStatus")) {
//Results are ignored. We don't really care.
return;
}
#if DEBUG >= 2
fprintf(stderr, "AMF0 command not processed!\n%s\n", amfData.Print().c_str());
#endif
WARN_MSG("AMF0 command not processed: %s", amfData.Print().c_str());
//send a _result reply
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
amfReply.addContent(AMF::Object("", "_error")); //result success
amfReply.addContent(amfData.getContent(1)); //same transaction ID
amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info
amfReply.addContent(AMF::Object("Command not implemented or recognized", "")); //stream ID?
amfReply.addContent(AMF::Object("", amfData.getContentP(0)->StrValue())); //null - command info
amfReply.addContent(AMF::Object("", "Command not implemented or recognized")); //stream ID?
sendCommand(amfReply, messageType, streamId);
} //parseAMFCommand
@ -809,9 +911,7 @@ namespace Mist {
switch (next.msg_type_id) {
case 0: //does not exist
#if DEBUG >= 2
fprintf(stderr, "UNKN: Received a zero-type message. Possible data corruption? Aborting!\n");
#endif
WARN_MSG("UNKN: Received a zero-type message. Possible data corruption? Aborting!");
while (inputBuffer.size()) {
inputBuffer.get().clear();
}
@ -820,20 +920,14 @@ namespace Mist {
break; //happens when connection breaks unexpectedly
case 1: //set chunk size
RTMPStream::chunk_rec_max = ntohl(*(int *)next.data.c_str());
#if DEBUG >= 5
fprintf(stderr, "CTRL: Set chunk size: %i\n", RTMPStream::chunk_rec_max);
#endif
MEDIUM_MSG("CTRL: Set chunk size: %i", RTMPStream::chunk_rec_max);
break;
case 2: //abort message - we ignore this one
#if DEBUG >= 5
fprintf(stderr, "CTRL: Abort message\n");
#endif
MEDIUM_MSG("CTRL: Abort message");
//4 bytes of stream id to drop
break;
case 3: //ack
#if DEBUG >= 8
fprintf(stderr, "CTRL: Acknowledgement\n");
#endif
VERYHIGH_MSG("CTRL: Acknowledgement");
RTMPStream::snd_window_at = ntohl(*(int *)next.data.c_str());
RTMPStream::snd_window_at = RTMPStream::snd_cnt;
break;
@ -848,49 +942,43 @@ namespace Mist {
//6 = pingrequest, 4 bytes data
//7 = pingresponse, 4 bytes data
//we don't need to process this
#if DEBUG >= 5
short int ucmtype = ntohs(*(short int *)next.data.c_str());
switch (ucmtype) {
case 0:
fprintf(stderr, "CTRL: UCM StreamBegin %i\n", ntohl(*((int *)(next.data.c_str() + 2))));
MEDIUM_MSG("CTRL: UCM StreamBegin %i", ntohl(*((int *)(next.data.c_str() + 2))));
break;
case 1:
fprintf(stderr, "CTRL: UCM StreamEOF %i\n", ntohl(*((int *)(next.data.c_str() + 2))));
MEDIUM_MSG("CTRL: UCM StreamEOF %i", ntohl(*((int *)(next.data.c_str() + 2))));
break;
case 2:
fprintf(stderr, "CTRL: UCM StreamDry %i\n", ntohl(*((int *)(next.data.c_str() + 2))));
MEDIUM_MSG("CTRL: UCM StreamDry %i", ntohl(*((int *)(next.data.c_str() + 2))));
break;
case 3:
fprintf(stderr, "CTRL: UCM SetBufferLength %i %i\n", ntohl(*((int *)(next.data.c_str() + 2))), ntohl(*((int *)(next.data.c_str() + 6))));
MEDIUM_MSG("CTRL: UCM SetBufferLength %i %i", ntohl(*((int *)(next.data.c_str() + 2))), ntohl(*((int *)(next.data.c_str() + 6))));
break;
case 4:
fprintf(stderr, "CTRL: UCM StreamIsRecorded %i\n", ntohl(*((int *)(next.data.c_str() + 2))));
MEDIUM_MSG("CTRL: UCM StreamIsRecorded %i", ntohl(*((int *)(next.data.c_str() + 2))));
break;
case 6:
fprintf(stderr, "CTRL: UCM PingRequest %i\n", ntohl(*((int *)(next.data.c_str() + 2))));
MEDIUM_MSG("CTRL: UCM PingRequest %i", ntohl(*((int *)(next.data.c_str() + 2))));
break;
case 7:
fprintf(stderr, "CTRL: UCM PingResponse %i\n", ntohl(*((int *)(next.data.c_str() + 2))));
MEDIUM_MSG("CTRL: UCM PingResponse %i", ntohl(*((int *)(next.data.c_str() + 2))));
break;
default:
fprintf(stderr, "CTRL: UCM Unknown (%hi)\n", ucmtype);
MEDIUM_MSG("CTRL: UCM Unknown (%hi)", ucmtype);
break;
}
#endif
}
break;
case 5: //window size of other end
#if DEBUG >= 5
fprintf(stderr, "CTRL: Window size\n");
#endif
MEDIUM_MSG("CTRL: Window size");
RTMPStream::rec_window_size = ntohl(*(int *)next.data.c_str());
RTMPStream::rec_window_at = RTMPStream::rec_cnt;
myConn.SendNow(RTMPStream::SendCTL(3, RTMPStream::rec_cnt)); //send ack (msg 3)
break;
case 6:
#if DEBUG >= 5
fprintf(stderr, "CTRL: Set peer bandwidth\n");
#endif
MEDIUM_MSG("CTRL: Set peer bandwidth");
//4 bytes window size, 1 byte limit type (ignored)
RTMPStream::snd_window_size = ntohl(*(int *)next.data.c_str());
myConn.SendNow(RTMPStream::SendCTL(5, RTMPStream::snd_window_size)); //send window acknowledgement size (msg 5)
@ -925,21 +1013,19 @@ namespace Mist {
break;
}
case 15:
DEBUG_MSG(DLVL_MEDIUM, "Received AMF3 data message");
MEDIUM_MSG("Received AMF3 data message");
break;
case 16:
DEBUG_MSG(DLVL_MEDIUM, "Received AMF3 shared object");
MEDIUM_MSG("Received AMF3 shared object");
break;
case 17: {
DEBUG_MSG(DLVL_MEDIUM, "Received AMF3 command message");
MEDIUM_MSG("Received AMF3 command message");
if (next.data[0] != 0) {
next.data = next.data.substr(1);
amf3data = AMF::parse3(next.data);
#if DEBUG >= 5
amf3data.Print();
#endif
MEDIUM_MSG("AMF3: %s", amf3data.Print().c_str());
} else {
DEBUG_MSG(DLVL_MEDIUM, "Received AMF3-0 command message");
MEDIUM_MSG("Received AMF3-0 command message");
next.data = next.data.substr(1);
amfdata = AMF::parse(next.data);
parseAMFCommand(amfdata, 17, next.msg_stream_id);
@ -947,7 +1033,7 @@ namespace Mist {
}
break;
case 19:
DEBUG_MSG(DLVL_MEDIUM, "Received AMF0 shared object");
MEDIUM_MSG("Received AMF0 shared object");
break;
case 20: { //AMF0 command message
amfdata = AMF::parse(next.data);
@ -955,10 +1041,10 @@ namespace Mist {
}
break;
case 22:
DEBUG_MSG(DLVL_MEDIUM, "Received aggregate message");
MEDIUM_MSG("Received aggregate message");
break;
default:
DEBUG_MSG(DLVL_FAIL, "Unknown chunk received! Probably protocol corruption, stopping parsing of incoming data.");
FAIL_MSG("Unknown chunk received! Probably protocol corruption, stopping parsing of incoming data.");
break;
}
}

View file

@ -15,6 +15,7 @@ namespace Mist {
void sendNext();
void sendHeader();
unsigned int needsPlayableKeys();
static bool listenMode();
protected:
bool isPushing;
void parseVars(std::string data);