RTMP Connector upgrade to DTSC - push mode doesn't convert to DTSC yet (it will tomorrow) and it is still untested - but should work.

This commit is contained in:
Thulinma 2012-04-15 01:54:39 +02:00
parent b0880215df
commit f9de7f9a64
4 changed files with 254 additions and 516 deletions

View file

@ -1,48 +0,0 @@
[BuckBunny Entry1]
trackinfo
timescale
length
language
sampledescription
sampletype
timescale
length
language
sampledescription
sampletype
audiochannels
audiosamplerate
videoframerate
aacaot
avclevel
avcprofile
audiocodecid
videocodecid
width
height
frameWidth
frameHeight
displayWidth
displayHeight
moovposition
duration
[FIFA Entry]
duration
width
height
videodatarate
framerate
videocodecid
audiodatarate
audiosamplerate
audiosamplesize
stero
audiocodecid
filesize

View file

@ -27,7 +27,9 @@ namespace Connector_RTMP{
Socket::Connection Socket; ///< Socket connected to user
Socket::Connection SS; ///< Socket connected to server
std::string streamname = "/tmp/shared_socket"; ///< Stream that will be opened
void parseChunk();
void parseChunk(std::string & buffer);///< Parses a single RTMP chunk.
void sendCommand(AMF::Object & amfreply, int messagetype, int stream_id);///< Sends a RTMP command either in AMF or AMF3 mode.
void parseAMFCommand(AMF::Object & amfdata, int messagetype, int stream_id);///< Parses a single AMF command message.
int Connector_RTMP(Socket::Connection conn);
};//Connector_RTMP namespace;
@ -35,8 +37,9 @@ namespace Connector_RTMP{
/// Main Connector_RTMP function
int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
Socket = conn;
FLV::Tag tag, viddata, auddata;
bool viddone = false, auddone = false;
FLV::Tag tag, init_tag;
DTSC::Stream Strm;
bool stream_inited = false;//true if init data for audio/video was sent
//first timestamp set
RTMPStream::firsttime = RTMPStream::getNowMS();
@ -61,15 +64,10 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
unsigned int lastStats = 0;
while (Socket.connected() && !FLV::Parse_Error){
//only parse input if available or not yet init'ed
//rightnow = getNowMS();
if (Socket.canRead() || !ready4data){// || (snd_cnt - snd_window_at >= snd_window_size)
switch (Socket.ready()){
case -1: break; //disconnected
case 0: break; //not ready yet
default: parseChunk(); break; //new data is waiting
}
while (Socket.connected()){
sleep(10000);//sleep 10ms to prevent high CPU usage
if (Socket.spool()){
parseChunk(Socket.Received());
}
if (ready4data){
if (!inited){
@ -95,54 +93,27 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
SS.write(stat);
}
}
SS.canRead();
switch (SS.ready()){
case -1:
#if DEBUG >= 1
fprintf(stderr, "Source socket is disconnected.\n");
#endif
Socket.close();//disconnect user
break;
case 0: break;//not ready yet
default:
bool justdone = false;
if (tag.SockLoader(SS)){//able to read a full packet?
//init data? parse and resent in correct order if all is received
/// \todo Check metadata for needed audio/video init or not - we now assume both video/audio are always present...
if (((tag.data[0] == 0x09) && !viddone) || ((tag.data[0] == 0x08) && !auddone)){
if (tag.needsInitData()){
if (tag.data[0] == 0x09){viddata = tag;}else{auddata = tag;}
}
if (tag.data[0] == 0x09){viddone = true;}else{auddone = true;}
justdone = true;
if (SS.spool()){
if (Strm.parsePacket(SS.Received())){
//sent init data if needed
if (!stream_inited){
if (Strm.metadata.getContentP("audio") && Strm.metadata.getContentP("audio")->getContentP("init")){
init_tag.DTSCAudioInit(Strm);
Socket.write(RTMPStream::SendMedia(init_tag));
}
if (viddone && auddone && justdone){
if (viddata.len != 0){
Socket.write(RTMPStream::SendMedia(viddata));
#if DEBUG >= 8
fprintf(stderr, "Sent tag to %i: [%u] %s\n", Socket.getSocket(), viddata.tagTime(), viddata.tagType().c_str());
#endif
}
if (auddata.len != 0){
Socket.write(RTMPStream::SendMedia(auddata));
#if DEBUG >= 8
fprintf(stderr, "Sent tag to %i: [%u] %s\n", Socket.getSocket(), auddata.tagTime(), auddata.tagType().c_str());
#endif
}
break;
if (Strm.metadata.getContentP("video") && Strm.metadata.getContentP("video")->getContentP("init")){
init_tag.DTSCVideoInit(Strm);
Socket.write(RTMPStream::SendMedia(init_tag));
}
//not gotten init yet? cancel this tag
if (tag.needsInitData()){
if ((tag.data[0] == 0x09) && (viddata.len == 0)){break;}
if ((tag.data[0] == 0x08) && (auddata.len == 0)){break;}
}
//send tag normally
Socket.write(RTMPStream::SendMedia(tag));
#if DEBUG >= 8
fprintf(stderr, "Sent tag to %i: [%u] %s\n", Socket.getSocket(), tag.tagTime(), tag.tagType().c_str());
#endif
stream_inited = true;
}
break;
//sent a tag
tag.DTSCLoader(Strm);
Socket.write(RTMPStream::SendMedia(tag));
#if DEBUG >= 8
fprintf(stderr, "Sent tag to %i: [%u] %s\n", Socket.getSocket(), tag.tagTime(), tag.tagType().c_str());
#endif
}
}
}
}
@ -165,15 +136,13 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
}//Connector_RTMP
/// Tries to get and parse one RTMP chunk at a time.
void Connector_RTMP::parseChunk(){
void Connector_RTMP::parseChunk(std::string & inbuffer){
static RTMPStream::Chunk next;
static std::string inbuffer;
FLV::Tag F;
static AMF::Object amfdata("empty", AMF::AMF0_DDV_CONTAINER);
static AMF::Object amfelem("empty", AMF::AMF0_DDV_CONTAINER);
static AMF::Object3 amf3data("empty", AMF::AMF3_DDV_CONTAINER);
static AMF::Object3 amf3elem("empty", AMF::AMF3_DDV_CONTAINER);
if (!Connector_RTMP::Socket.read(inbuffer)){return;} //try to get more data
while (next.Parse(inbuffer)){
@ -246,6 +215,7 @@ void Connector_RTMP::parseChunk(){
#if DEBUG >= 4
fprintf(stderr, "A");
#endif
/// \TODO Convert to DTSC properly.
SS.write(std::string(F.data, F.len));
}else{
#if DEBUG >= 4
@ -260,6 +230,7 @@ void Connector_RTMP::parseChunk(){
#if DEBUG >= 4
fprintf(stderr, "V");
#endif
/// \TODO Convert to DTSC properly.
SS.write(std::string(F.data, F.len));
}else{
#if DEBUG >= 4
@ -279,7 +250,6 @@ void Connector_RTMP::parseChunk(){
#endif
break;
case 17:{
bool parsed3 = false;
#if DEBUG >= 4
fprintf(stderr, "Received AFM3 command message\n");
#endif
@ -295,205 +265,7 @@ void Connector_RTMP::parseChunk(){
#endif
next.data = next.data.substr(1);
amfdata = AMF::parse(next.data);
#if DEBUG >= 4
amfdata.Print();
#endif
if (amfdata.getContentP(0)->StrValue() == "connect"){
double objencoding = 0;
if (amfdata.getContentP(2)->getContentP("objectEncoding")){
objencoding = amfdata.getContentP(2)->getContentP("objectEncoding")->NumValue();
}
fprintf(stderr, "Object encoding set to %e\n", objencoding);
#if DEBUG >= 4
int tmpint;
tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue();
if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");}
if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");}
tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue();
if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");}
if (tmpint & 0x400){fprintf(stderr, "AAC audio support detected\n");}
#endif
Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5)
Socket.write(RTMPStream::SendCTL(6, RTMPStream::rec_window_size));//send window acknowledgement size (msg 5)
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object(""));//server properties
amfreply.getContentP(2)->addContent(AMF::Object("fmsVer", "FMS/3,0,1,123"));
amfreply.getContentP(2)->addContent(AMF::Object("capabilities", (double)31));
//amfreply.getContentP(2)->addContent(AMF::Object("mode", (double)1));
amfreply.addContent(AMF::Object(""));//info
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetConnection.Connect.Success"));
amfreply.getContentP(3)->addContent(AMF::Object("description", "Connection succeeded."));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", 1337));
amfreply.getContentP(3)->addContent(AMF::Object("objectEncoding", objencoding));
//amfreply.getContentP(3)->addContent(AMF::Object("data", AMF::AMF0_ECMA_ARRAY));
//amfreply.getContentP(3)->getContentP(4)->addContent(AMF::Object("version", "3,5,4,1004"));
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack()));
//send onBWDone packet - no clue what it is, but real server sends it...
amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onBWDone"));//result
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null
Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack()));
parsed3 = true;
}//connect
if (amfdata.getContentP(0)->StrValue() == "createStream"){
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object("", (double)1));//stream ID - we use 1
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack()));
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
parsed3 = true;
}//createStream
if ((amfdata.getContentP(0)->StrValue() == "closeStream") || (amfdata.getContentP(0)->StrValue() == "deleteStream")){
if (SS.connected()){SS.close();}
}
if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object("", (double)0));//zero length
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack()));
parsed3 = true;
}//getStreamLength
if ((amfdata.getContentP(0)->StrValue() == "publish")){
if (amfdata.getContentP(3)){
streamname = amfdata.getContentP(3)->StrValue();
for (std::string::iterator i=streamname.begin(); i != streamname.end(); ++i){
if (*i == '?'){streamname.erase(i, streamname.end()); break;}
if (!isalpha(*i) && !isdigit(*i) && *i != '_'){
streamname.erase(i);
--i;
}else{
*i=tolower(*i);
}
}
streamname = "/tmp/shared_socket_" + streamname;
#if DEBUG >= 4
fprintf(stderr, "Connecting to buffer %s...\n", streamname.c_str());
#endif
SS = Socket::Connection(streamname);
if (!SS.connected()){
#if DEBUG >= 1
fprintf(stderr, "Could not connect to server!\n");
#endif
Socket.close();//disconnect user
break;
}
SS.write("P "+Socket.getHost()+'\n');
nostats = true;
#if DEBUG >= 4
fprintf(stderr, "Connected to buffer, starting to sent data...\n");
#endif
}
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success?
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack()));
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
//send a status reply
amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
amfreply.addContent(AMF::Object("", 0, AMF::AMF0_NUMBER));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object(""));//info
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start"));
amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!"));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack()));
parsed3 = true;
}//getStreamLength
if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(3, 17, 1, (char)0+amfreply.Pack()));
parsed3 = true;
}//checkBandwidth
if ((amfdata.getContentP(0)->StrValue() == "play") || (amfdata.getContentP(0)->StrValue() == "play2")){
//send streambegin
streamname = amfdata.getContentP(3)->StrValue();
for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){
if (!isalpha(*i) && !isdigit(*i) && *i != '_'){streamname.erase(i);}else{*i=tolower(*i);}
}
streamname = "/tmp/shared_socket_" + streamname;
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
//send a status reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object(""));//info
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Reset"));
amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing and resetting..."));
amfreply.getContentP(3)->addContent(AMF::Object("details", "PLS"));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(4, 17, next.msg_stream_id, (char)0+amfreply.Pack()));
amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object(""));//info
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Start"));
amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!"));
amfreply.getContentP(3)->addContent(AMF::Object("details", "PLS"));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(4, 17, 1, (char)0+amfreply.Pack()));
RTMPStream::chunk_snd_max = 102400;//100KiB
Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1)
Connector_RTMP::ready4data = true;//start sending video data!
parsed3 = true;
}//createStream
#if DEBUG >= 3
fprintf(stderr, "AMF0 command: %s\n", amfdata.getContentP(0)->StrValue().c_str());
#endif
if (!parsed3){
#if DEBUG >= 2
fprintf(stderr, "AMF0 command not processed! :(\n");
#endif
}
parseAMFCommand(amfdata, 17, next.msg_stream_id);
}//parsing AMF0-style
} break;
case 18:
@ -511,212 +283,8 @@ void Connector_RTMP::parseChunk(){
#endif
break;
case 20:{//AMF0 command message
bool parsed = false;
amfdata = AMF::parse(next.data);
#if DEBUG >= 4
amfdata.Print();
#endif
if (amfdata.getContentP(0)->StrValue() == "connect"){
double objencoding = 0;
if (amfdata.getContentP(2)->getContentP("objectEncoding")){
objencoding = amfdata.getContentP(2)->getContentP("objectEncoding")->NumValue();
}
fprintf(stderr, "Object encoding set to %e\n", objencoding);
#if DEBUG >= 4
int tmpint;
if (amfdata.getContentP(2)->getContentP("videoCodecs")){
tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue();
if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");}
if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");}
}
if (amfdata.getContentP(2)->getContentP("audioCodecs")){
tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue();
if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");}
if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");}
}
#endif
RTMPStream::chunk_snd_max = 4096;
Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1)
Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5)
Socket.write(RTMPStream::SendCTL(6, RTMPStream::rec_window_size));//send rec window acknowledgement size (msg 6)
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object(""));//server properties
amfreply.getContentP(2)->addContent(AMF::Object("fmsVer", "FMS/3,0,1,123"));
amfreply.getContentP(2)->addContent(AMF::Object("capabilities", (double)31));
//amfreply.getContentP(2)->addContent(AMF::Object("mode", (double)1));
amfreply.addContent(AMF::Object(""));//info
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetConnection.Connect.Success"));
amfreply.getContentP(3)->addContent(AMF::Object("description", "Connection succeeded."));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", 1337));
amfreply.getContentP(3)->addContent(AMF::Object("objectEncoding", objencoding));
//amfreply.getContentP(3)->addContent(AMF::Object("data", AMF::AMF0_ECMA_ARRAY));
//amfreply.getContentP(3)->getContentP(4)->addContent(AMF::Object("version", "3,5,4,1004"));
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()));
//send onBWDone packet - no clue what it is, but real server sends it...
amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onBWDone"));//result
amfreply.addContent(AMF::Object("", (double)0));//zero
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null
Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()));
parsed = true;
}//connect
if (amfdata.getContentP(0)->StrValue() == "createStream"){
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object("", (double)1));//stream ID - we use 1
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()));
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
parsed = true;
}//createStream
if ((amfdata.getContentP(0)->StrValue() == "closeStream") || (amfdata.getContentP(0)->StrValue() == "deleteStream")){
if (SS.connected()){SS.close();}
}
if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object("", (double)0));//zero length
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()));
parsed = true;
}//getStreamLength
if ((amfdata.getContentP(0)->StrValue() == "publish")){
if (amfdata.getContentP(3)){
streamname = amfdata.getContentP(3)->StrValue();
for (std::string::iterator i=streamname.begin(); i != streamname.end(); ++i){
if (*i == '?'){streamname.erase(i, streamname.end()); break;}
if (!isalpha(*i) && !isdigit(*i) && *i != '_'){
streamname.erase(i);
--i;
}else{
*i=tolower(*i);
}
}
streamname = "/tmp/shared_socket_" + streamname;
#if DEBUG >= 4
fprintf(stderr, "Connecting to buffer %s...\n", streamname.c_str());
#endif
SS = Socket::Connection(streamname);
if (!SS.connected()){
#if DEBUG >= 1
fprintf(stderr, "Could not connect to server!\n");
#endif
Socket.close();//disconnect user
break;
}
SS.write("P "+Socket.getHost()+'\n');
nostats = true;
#if DEBUG >= 4
fprintf(stderr, "Connected to buffer, starting to send data...\n");
#endif
}
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success?
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()));
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
//send a status reply
amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
amfreply.addContent(AMF::Object("", 0, AMF::AMF0_NUMBER));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object(""));//info
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start"));
amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!"));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(4, 20, next.msg_stream_id, amfreply.Pack()));
parsed = true;
}//getStreamLength
if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(3, 20, 1, amfreply.Pack()));
parsed = true;
}//checkBandwidth
if ((amfdata.getContentP(0)->StrValue() == "play") || (amfdata.getContentP(0)->StrValue() == "play2")){
//send streambegin
streamname = amfdata.getContentP(3)->StrValue();
for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){
if (!isalpha(*i) && !isdigit(*i) && *i != '_'){streamname.erase(i);}else{*i=tolower(*i);}
}
streamname = "/tmp/shared_socket_" + streamname;
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
//send a status reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object(""));//info
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Reset"));
amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing and resetting..."));
amfreply.getContentP(3)->addContent(AMF::Object("details", "PLS"));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(4, 20, next.msg_stream_id, amfreply.Pack()));
amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object(""));//info
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Start"));
amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!"));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
#if DEBUG >= 4
amfreply.Print();
#endif
Socket.write(RTMPStream::SendChunk(4, 20, 1, amfreply.Pack()));
RTMPStream::chunk_snd_max = 102400;//100KiB;
Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1)
Connector_RTMP::ready4data = true;//start sending video data!
parsed = true;
}//createStream
#if DEBUG >= 3
fprintf(stderr, "AMF0 command: %s\n", amfdata.getContentP(0)->StrValue().c_str());
#endif
if (!parsed){
#if DEBUG >= 2
fprintf(stderr, "AMF0 command not processed! :(\n");
#endif
}
parseAMFCommand(amfdata, 20, next.msg_stream_id);
} break;
case 22:
#if DEBUG >= 4
@ -733,6 +301,223 @@ void Connector_RTMP::parseChunk(){
}
}//parseChunk
void Connector_RTMP::sendCommand(AMF::Object & amfreply, int messagetype, int stream_id){
if (messagetype == 17){
Socket.write(RTMPStream::SendChunk(3, messagetype, stream_id, (char)0+amfreply.Pack()));
}else{
Socket.write(RTMPStream::SendChunk(3, messagetype, stream_id, amfreply.Pack()));
}
}//sendCommand
void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int stream_id){
bool parsed = false;
#if DEBUG >= 4
amfdata.Print();
#endif
if (amfdata.getContentP(0)->StrValue() == "connect"){
double objencoding = 0;
if (amfdata.getContentP(2)->getContentP("objectEncoding")){
objencoding = amfdata.getContentP(2)->getContentP("objectEncoding")->NumValue();
}
fprintf(stderr, "Object encoding set to %e\n", objencoding);
#if DEBUG >= 4
int tmpint;
if (amfdata.getContentP(2)->getContentP("videoCodecs")){
tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue();
if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");}
if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");}
}
if (amfdata.getContentP(2)->getContentP("audioCodecs")){
tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue();
if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");}
if (tmpint & 0x400){fprintf(stderr, "AAC audio support detected\n");}
}
#endif
RTMPStream::chunk_snd_max = 4096;
Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1)
Socket.write(RTMPStream::SendCTL(5, RTMPStream::snd_window_size));//send window acknowledgement size (msg 5)
Socket.write(RTMPStream::SendCTL(6, RTMPStream::rec_window_size));//send rec window acknowledgement size (msg 6)
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object(""));//server properties
amfreply.getContentP(2)->addContent(AMF::Object("fmsVer", "FMS/3,0,1,123"));
amfreply.getContentP(2)->addContent(AMF::Object("capabilities", (double)31));
//amfreply.getContentP(2)->addContent(AMF::Object("mode", (double)1));
amfreply.addContent(AMF::Object(""));//info
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetConnection.Connect.Success"));
amfreply.getContentP(3)->addContent(AMF::Object("description", "Connection succeeded."));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", 1337));
amfreply.getContentP(3)->addContent(AMF::Object("objectEncoding", objencoding));
//amfreply.getContentP(3)->addContent(AMF::Object("data", AMF::AMF0_ECMA_ARRAY));
//amfreply.getContentP(3)->getContentP(4)->addContent(AMF::Object("version", "3,5,4,1004"));
#if DEBUG >= 4
amfreply.Print();
#endif
sendCommand(amfreply, messagetype, stream_id);
//send onBWDone packet - no clue what it is, but real server sends it...
amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onBWDone"));//result
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null
sendCommand(amfreply, messagetype, stream_id);
parsed = true;
}//connect
if (amfdata.getContentP(0)->StrValue() == "createStream"){
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object("", (double)1));//stream ID - we use 1
#if DEBUG >= 4
amfreply.Print();
#endif
sendCommand(amfreply, messagetype, stream_id);
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
parsed = true;
}//createStream
if ((amfdata.getContentP(0)->StrValue() == "closeStream") || (amfdata.getContentP(0)->StrValue() == "deleteStream")){
if (SS.connected()){SS.close();}
}
if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object("", (double)0));//zero length
#if DEBUG >= 4
amfreply.Print();
#endif
sendCommand(amfreply, messagetype, stream_id);
parsed = true;
}//getStreamLength
if ((amfdata.getContentP(0)->StrValue() == "publish")){
if (amfdata.getContentP(3)){
streamname = amfdata.getContentP(3)->StrValue();
for (std::string::iterator i=streamname.begin(); i != streamname.end(); ++i){
if (*i == '?'){streamname.erase(i, streamname.end()); break;}
if (!isalpha(*i) && !isdigit(*i) && *i != '_'){
streamname.erase(i);
--i;
}else{
*i=tolower(*i);
}
}
streamname = "/tmp/shared_socket_" + streamname;
#if DEBUG >= 4
fprintf(stderr, "Connecting to buffer %s...\n", streamname.c_str());
#endif
SS = Socket::Connection(streamname);
if (!SS.connected()){
#if DEBUG >= 1
fprintf(stderr, "Could not connect to server!\n");
#endif
Socket.close();//disconnect user
return;
}
SS.write("P "+Socket.getHost()+'\n');
nostats = true;
#if DEBUG >= 4
fprintf(stderr, "Connected to buffer, starting to send data...\n");
#endif
}
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success?
#if DEBUG >= 4
amfreply.Print();
#endif
sendCommand(amfreply, messagetype, stream_id);
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
//send a status reply
amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
amfreply.addContent(AMF::Object("", 0, AMF::AMF0_NUMBER));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object(""));//info
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start"));
amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!"));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
#if DEBUG >= 4
amfreply.Print();
#endif
sendCommand(amfreply, messagetype, stream_id);
parsed = true;
}//getStreamLength
if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){
//send a _result reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "_result"));//result success
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
#if DEBUG >= 4
amfreply.Print();
#endif
sendCommand(amfreply, messagetype, stream_id);
parsed = true;
}//checkBandwidth
if ((amfdata.getContentP(0)->StrValue() == "play") || (amfdata.getContentP(0)->StrValue() == "play2")){
//send streambegin
streamname = amfdata.getContentP(3)->StrValue();
for (std::string::iterator i=streamname.end()-1; i>=streamname.begin(); --i){
if (!isalpha(*i) && !isdigit(*i) && *i != '_'){streamname.erase(i);}else{*i=tolower(*i);}
}
streamname = "/tmp/shared_socket_" + streamname;
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
//send a status reply
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object(""));//info
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Reset"));
amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing and resetting..."));
amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV"));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
#if DEBUG >= 4
amfreply.Print();
#endif
sendCommand(amfreply, messagetype, stream_id);
amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
amfreply.addContent(AMF::Object(""));//info
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Start"));
amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!"));
amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV"));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
#if DEBUG >= 4
amfreply.Print();
#endif
sendCommand(amfreply, messagetype, stream_id);
RTMPStream::chunk_snd_max = 102400;//100KiB
Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1)
Connector_RTMP::ready4data = true;//start sending video data!
parsed = true;
}//createStream
#if DEBUG >= 3
fprintf(stderr, "AMF0 command: %s\n", amfdata.getContentP(0)->StrValue().c_str());
#endif
if (!parsed){
#if DEBUG >= 2
fprintf(stderr, "AMF0 command not processed! :(\n");
#endif
}
}//parseAMFCommand
// Load main server setup file, default port 1935, handler is Connector_RTMP::Connector_RTMP
#define DEFAULT_PORT 1935

View file

@ -222,9 +222,10 @@ std::string Socket::Connection::getStats(std::string C){
}
/// Updates the downbuffer and upbuffer internal variables.
void Socket::Connection::spool(){
iread(downbuffer);
/// Returns true if new data was received, false otherwise.
bool Socket::Connection::spool(){
iwrite(upbuffer);
return iread(downbuffer);
}
/// Returns a reference to the download buffer.

View file

@ -48,7 +48,7 @@ namespace Socket{
bool swrite(std::string & buffer); ///< Write call that is compatible with std::string.
bool iread(std::string & buffer); ///< Incremental write call that is compatible with std::string.
bool iwrite(std::string & buffer); ///< Write call that is compatible with std::string.
void spool(); ///< Updates the downbuffer and upbuffer internal variables.
bool spool(); ///< Updates the downbuffer and upbuffer internal variables.
std::string & Received(); ///< Returns a reference to the download buffer.
void Send(std::string data); ///< Appends data to the upbuffer.
void close(); ///< Close connection.