RTMP seeking support.
This commit is contained in:
parent
a9db94a598
commit
58a6af9450
1 changed files with 59 additions and 53 deletions
|
@ -27,6 +27,11 @@ namespace Connector_RTMP{
|
||||||
bool nostats = false; ///< Set to true if no stats should be sent anymore (push mode).
|
bool nostats = false; ///< Set to true if no stats should be sent anymore (push mode).
|
||||||
bool stopparsing = false; ///< Set to true when all parsing needs to be cancelled.
|
bool stopparsing = false; ///< Set to true when all parsing needs to be cancelled.
|
||||||
|
|
||||||
|
//for reply to play command
|
||||||
|
int play_trans = -1;
|
||||||
|
int play_streamid = -1;
|
||||||
|
int play_msgtype = -1;
|
||||||
|
|
||||||
Socket::Connection Socket; ///< Socket connected to user
|
Socket::Connection Socket; ///< Socket connected to user
|
||||||
Socket::Connection SS; ///< Socket connected to server
|
Socket::Connection SS; ///< Socket connected to server
|
||||||
std::string streamname; ///< Stream that will be opened
|
std::string streamname; ///< Stream that will be opened
|
||||||
|
@ -101,6 +106,42 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
|
||||||
}
|
}
|
||||||
if (SS.spool()){
|
if (SS.spool()){
|
||||||
while (Strm.parsePacket(SS.Received())){
|
while (Strm.parsePacket(SS.Received())){
|
||||||
|
|
||||||
|
if (play_trans != -1){
|
||||||
|
//send streambegin
|
||||||
|
Socket.Send(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
|
||||||
|
//send streamisrecorded if stream, well, is recorded.
|
||||||
|
if (Strm.metadata.isMember("length") && Strm.metadata["length"].asInt() > 0){
|
||||||
|
Socket.Send(RTMPStream::SendUSR(4, 2));//send UCM StreamIsRecorded (4), stream 1
|
||||||
|
}
|
||||||
|
//send a status reply
|
||||||
|
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
|
||||||
|
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
|
||||||
|
amfreply.addContent(AMF::Object("", (double)play_trans));//same transaction ID
|
||||||
|
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
||||||
|
amfreply.addContent(AMF::Object(""));//info
|
||||||
|
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
|
||||||
|
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Reset"));
|
||||||
|
amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing and resetting..."));
|
||||||
|
amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV"));
|
||||||
|
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
||||||
|
sendCommand(amfreply, play_msgtype, play_streamid);
|
||||||
|
amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
|
||||||
|
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
|
||||||
|
amfreply.addContent(AMF::Object("", (double)play_trans));//same transaction ID
|
||||||
|
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
||||||
|
amfreply.addContent(AMF::Object(""));//info
|
||||||
|
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
|
||||||
|
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Start"));
|
||||||
|
amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!"));
|
||||||
|
amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV"));
|
||||||
|
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
||||||
|
sendCommand(amfreply, play_msgtype, play_streamid);
|
||||||
|
RTMPStream::chunk_snd_max = 102400;//100KiB
|
||||||
|
Socket.Send(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1)
|
||||||
|
play_trans = -1;
|
||||||
|
}
|
||||||
|
|
||||||
//sent init data if needed
|
//sent init data if needed
|
||||||
if (!stream_inited){
|
if (!stream_inited){
|
||||||
init_tag.DTSCMetaInit(Strm);
|
init_tag.DTSCMetaInit(Strm);
|
||||||
|
@ -308,6 +349,9 @@ void Connector_RTMP::parseChunk(std::string & inbuffer){
|
||||||
}//parseChunk
|
}//parseChunk
|
||||||
|
|
||||||
void Connector_RTMP::sendCommand(AMF::Object & amfreply, int messagetype, int stream_id){
|
void Connector_RTMP::sendCommand(AMF::Object & amfreply, int messagetype, int stream_id){
|
||||||
|
#if DEBUG >= 4
|
||||||
|
amfreply.Print();
|
||||||
|
#endif
|
||||||
if (messagetype == 17){
|
if (messagetype == 17){
|
||||||
Socket.Send(RTMPStream::SendChunk(3, messagetype, stream_id, (char)0+amfreply.Pack()));
|
Socket.Send(RTMPStream::SendChunk(3, messagetype, stream_id, (char)0+amfreply.Pack()));
|
||||||
}else{
|
}else{
|
||||||
|
@ -361,9 +405,6 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
|
||||||
amfreply.getContentP(3)->addContent(AMF::Object("objectEncoding", objencoding));
|
amfreply.getContentP(3)->addContent(AMF::Object("objectEncoding", objencoding));
|
||||||
//amfreply.getContentP(3)->addContent(AMF::Object("data", AMF::AMF0_ECMA_ARRAY));
|
//amfreply.getContentP(3)->addContent(AMF::Object("data", AMF::AMF0_ECMA_ARRAY));
|
||||||
//amfreply.getContentP(3)->getContentP(4)->addContent(AMF::Object("version", "3,5,4,1004"));
|
//amfreply.getContentP(3)->getContentP(4)->addContent(AMF::Object("version", "3,5,4,1004"));
|
||||||
#if DEBUG >= 4
|
|
||||||
amfreply.Print();
|
|
||||||
#endif
|
|
||||||
sendCommand(amfreply, messagetype, stream_id);
|
sendCommand(amfreply, messagetype, stream_id);
|
||||||
//send onBWDone packet - no clue what it is, but real server sends it...
|
//send onBWDone packet - no clue what it is, but real server sends it...
|
||||||
//amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
|
//amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
|
||||||
|
@ -380,9 +421,6 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
|
||||||
amfreply.addContent(amfdata.getContent(1));//same transaction ID
|
amfreply.addContent(amfdata.getContent(1));//same transaction ID
|
||||||
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
||||||
amfreply.addContent(AMF::Object("", (double)1));//stream ID - we use 1
|
amfreply.addContent(AMF::Object("", (double)1));//stream ID - we use 1
|
||||||
#if DEBUG >= 4
|
|
||||||
amfreply.Print();
|
|
||||||
#endif
|
|
||||||
sendCommand(amfreply, messagetype, stream_id);
|
sendCommand(amfreply, messagetype, stream_id);
|
||||||
Socket.Send(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
|
Socket.Send(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
|
||||||
return;
|
return;
|
||||||
|
@ -398,9 +436,6 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
|
||||||
amfreply.addContent(amfdata.getContent(1));//same transaction ID
|
amfreply.addContent(amfdata.getContent(1));//same transaction ID
|
||||||
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
||||||
amfreply.addContent(AMF::Object("", (double)0));//zero length
|
amfreply.addContent(AMF::Object("", (double)0));//zero length
|
||||||
#if DEBUG >= 4
|
|
||||||
amfreply.Print();
|
|
||||||
#endif
|
|
||||||
sendCommand(amfreply, messagetype, stream_id);
|
sendCommand(amfreply, messagetype, stream_id);
|
||||||
return;
|
return;
|
||||||
}//getStreamLength
|
}//getStreamLength
|
||||||
|
@ -428,9 +463,6 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
|
||||||
amfreply.addContent(amfdata.getContent(1));//same transaction ID
|
amfreply.addContent(amfdata.getContent(1));//same transaction ID
|
||||||
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
||||||
amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success?
|
amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success?
|
||||||
#if DEBUG >= 4
|
|
||||||
amfreply.Print();
|
|
||||||
#endif
|
|
||||||
sendCommand(amfreply, messagetype, stream_id);
|
sendCommand(amfreply, messagetype, stream_id);
|
||||||
Socket.Send(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
|
Socket.Send(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
|
||||||
//send a status reply
|
//send a status reply
|
||||||
|
@ -443,9 +475,6 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
|
||||||
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start"));
|
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start"));
|
||||||
amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!"));
|
amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!"));
|
||||||
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
||||||
#if DEBUG >= 4
|
|
||||||
amfreply.Print();
|
|
||||||
#endif
|
|
||||||
sendCommand(amfreply, messagetype, stream_id);
|
sendCommand(amfreply, messagetype, stream_id);
|
||||||
return;
|
return;
|
||||||
}//getStreamLength
|
}//getStreamLength
|
||||||
|
@ -456,51 +485,28 @@ void Connector_RTMP::parseAMFCommand(AMF::Object & amfdata, int messagetype, int
|
||||||
amfreply.addContent(amfdata.getContent(1));//same transaction ID
|
amfreply.addContent(amfdata.getContent(1));//same transaction ID
|
||||||
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
||||||
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
||||||
#if DEBUG >= 4
|
|
||||||
amfreply.Print();
|
|
||||||
#endif
|
|
||||||
sendCommand(amfreply, messagetype, stream_id);
|
sendCommand(amfreply, messagetype, stream_id);
|
||||||
return;
|
return;
|
||||||
}//checkBandwidth
|
}//checkBandwidth
|
||||||
if ((amfdata.getContentP(0)->StrValue() == "play") || (amfdata.getContentP(0)->StrValue() == "play2")){
|
if ((amfdata.getContentP(0)->StrValue() == "play") || (amfdata.getContentP(0)->StrValue() == "play2")){
|
||||||
//send streambegin
|
//set reply number and stream name, actual reply is sent up in the SS.spool() handler
|
||||||
|
play_trans = amfdata.getContentP(1)->NumValue();
|
||||||
|
play_msgtype = messagetype;
|
||||||
|
play_streamid = stream_id;
|
||||||
streamname = amfdata.getContentP(3)->StrValue();
|
streamname = amfdata.getContentP(3)->StrValue();
|
||||||
Socket.Send(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
|
|
||||||
//send a status reply
|
|
||||||
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
|
|
||||||
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
|
|
||||||
amfreply.addContent(amfdata.getContent(1));//same transaction ID
|
|
||||||
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
|
||||||
amfreply.addContent(AMF::Object(""));//info
|
|
||||||
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
|
|
||||||
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Reset"));
|
|
||||||
amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing and resetting..."));
|
|
||||||
amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV"));
|
|
||||||
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
|
||||||
#if DEBUG >= 4
|
|
||||||
amfreply.Print();
|
|
||||||
#endif
|
|
||||||
sendCommand(amfreply, messagetype, stream_id);
|
|
||||||
amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
|
|
||||||
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
|
|
||||||
amfreply.addContent(amfdata.getContent(1));//same transaction ID
|
|
||||||
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
|
||||||
amfreply.addContent(AMF::Object(""));//info
|
|
||||||
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
|
|
||||||
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Play.Start"));
|
|
||||||
amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!"));
|
|
||||||
amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV"));
|
|
||||||
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
|
||||||
#if DEBUG >= 4
|
|
||||||
amfreply.Print();
|
|
||||||
#endif
|
|
||||||
sendCommand(amfreply, messagetype, stream_id);
|
|
||||||
RTMPStream::chunk_snd_max = 102400;//100KiB
|
|
||||||
Socket.Send(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1)
|
|
||||||
Connector_RTMP::ready4data = true;//start sending video data!
|
Connector_RTMP::ready4data = true;//start sending video data!
|
||||||
return;
|
return;
|
||||||
}//createStream
|
}//play
|
||||||
|
if ((amfdata.getContentP(0)->StrValue() == "seek")){
|
||||||
|
//set reply number and stream name, actual reply is sent up in the SS.spool() handler
|
||||||
|
play_trans = amfdata.getContentP(1)->NumValue();
|
||||||
|
play_msgtype = messagetype;
|
||||||
|
play_streamid = stream_id;
|
||||||
|
stream_inited = false;
|
||||||
|
SS.Send("seek " + JSON::Value((long long int)amfdata.getContentP(3)->NumValue()).asString() + "\n");
|
||||||
|
return;
|
||||||
|
}//seek
|
||||||
|
|
||||||
#if DEBUG >= 2
|
#if DEBUG >= 2
|
||||||
fprintf(stderr, "AMF0 command not processed! :(\n");
|
fprintf(stderr, "AMF0 command not processed! :(\n");
|
||||||
#endif
|
#endif
|
||||||
|
|
Loading…
Add table
Reference in a new issue