PCM support in Flash-based protocols
This commit is contained in:
parent
bb5d0dda11
commit
cef78b7d08
5 changed files with 138 additions and 72 deletions
|
@ -182,7 +182,11 @@ const char * FLV::Tag::getVideoCodec() {
|
||||||
const char * FLV::Tag::getAudioCodec() {
|
const char * FLV::Tag::getAudioCodec() {
|
||||||
switch (data[11] & 0xF0) {
|
switch (data[11] & 0xF0) {
|
||||||
case 0x00:
|
case 0x00:
|
||||||
return "PCMPE";
|
if (data[11] & 0x02){
|
||||||
|
return "PCMPE";//unknown endianness
|
||||||
|
}else{
|
||||||
|
return "PCM";//8 bit is always regular PCM
|
||||||
|
}
|
||||||
case 0x10:
|
case 0x10:
|
||||||
return "ADPCM";
|
return "ADPCM";
|
||||||
case 0x20:
|
case 0x20:
|
||||||
|
|
|
@ -132,6 +132,18 @@ namespace Mist {
|
||||||
return getNext();
|
return getNext();
|
||||||
}
|
}
|
||||||
thisPacket.genericFill(tmpTag.tagTime(), tmpTag.offset(), tmpTag.getTrackID(), tmpTag.getData(), tmpTag.getDataLen(), lastBytePos, tmpTag.isKeyframe); //init packet from tmpTags data
|
thisPacket.genericFill(tmpTag.tagTime(), tmpTag.offset(), tmpTag.getTrackID(), tmpTag.getData(), tmpTag.getDataLen(), lastBytePos, tmpTag.isKeyframe); //init packet from tmpTags data
|
||||||
|
|
||||||
|
DTSC::Track & trk = myMeta.tracks[tmpTag.getTrackID()];
|
||||||
|
if (trk.codec == "PCM" && trk.size == 16){
|
||||||
|
char * ptr = 0;
|
||||||
|
uint32_t ptrSize = 0;
|
||||||
|
thisPacket.getString("data", ptr, ptrSize);
|
||||||
|
for (uint32_t i = 0; i < ptrSize; i+=2){
|
||||||
|
char tmpchar = ptr[i];
|
||||||
|
ptr[i] = ptr[i+1];
|
||||||
|
ptr[i+1] = tmpchar;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void inputFLV::seek(int seekTime) {
|
void inputFLV::seek(int seekTime) {
|
||||||
|
|
|
@ -172,7 +172,17 @@ namespace Mist {
|
||||||
H.Chunkify("", 0, myConn);
|
H.Chunkify("", 0, myConn);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
tag.DTSCLoader(thisPacket, myMeta.tracks[thisPacket.getTrackId()]);
|
DTSC::Track & trk = myMeta.tracks[thisPacket.getTrackId()];
|
||||||
|
tag.DTSCLoader(thisPacket, trk);
|
||||||
|
if (trk.codec == "PCM" && trk.size == 16){
|
||||||
|
char * ptr = tag.getData();
|
||||||
|
uint32_t ptrSize = tag.getDataLen();
|
||||||
|
for (uint32_t i = 0; i < ptrSize; i+=2){
|
||||||
|
char tmpchar = ptr[i];
|
||||||
|
ptr[i] = ptr[i+1];
|
||||||
|
ptr[i+1] = tmpchar;
|
||||||
|
}
|
||||||
|
}
|
||||||
if (tag.len){
|
if (tag.len){
|
||||||
H.Chunkify(tag.data, tag.len, myConn);
|
H.Chunkify(tag.data, tag.len, myConn);
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,17 @@ namespace Mist {
|
||||||
}
|
}
|
||||||
|
|
||||||
void OutProgressiveFLV::sendNext(){
|
void OutProgressiveFLV::sendNext(){
|
||||||
tag.DTSCLoader(thisPacket, myMeta.tracks[thisPacket.getTrackId()]);
|
DTSC::Track & trk = myMeta.tracks[thisPacket.getTrackId()];
|
||||||
|
tag.DTSCLoader(thisPacket, trk);
|
||||||
|
if (trk.codec == "PCM" && trk.size == 16){
|
||||||
|
char * ptr = tag.getData();
|
||||||
|
uint32_t ptrSize = tag.getDataLen();
|
||||||
|
for (uint32_t i = 0; i < ptrSize; i+=2){
|
||||||
|
char tmpchar = ptr[i];
|
||||||
|
ptr[i] = ptr[i+1];
|
||||||
|
ptr[i+1] = tmpchar;
|
||||||
|
}
|
||||||
|
}
|
||||||
myConn.SendNow(tag.data, tag.len);
|
myConn.SendNow(tag.data, tag.len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -121,7 +121,7 @@ namespace Mist {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void OutRTMP::init(Util::Config * cfg) {
|
void OutRTMP::init(Util::Config * cfg){
|
||||||
Output::init(cfg);
|
Output::init(cfg);
|
||||||
capa["name"] = "RTMP";
|
capa["name"] = "RTMP";
|
||||||
capa["desc"] = "Enables ingest and output over Adobe's RTMP protocol.";
|
capa["desc"] = "Enables ingest and output over Adobe's RTMP protocol.";
|
||||||
|
@ -150,7 +150,7 @@ namespace Mist {
|
||||||
config = cfg;
|
config = cfg;
|
||||||
}
|
}
|
||||||
|
|
||||||
void OutRTMP::sendNext() {
|
void OutRTMP::sendNext(){
|
||||||
|
|
||||||
//If there are now more selectable tracks, select the new track and do a seek to the current timestamp
|
//If there are now more selectable tracks, select the new track and do a seek to the current timestamp
|
||||||
//Set sentHeader to false to force it to send init data
|
//Set sentHeader to false to force it to send init data
|
||||||
|
@ -173,14 +173,16 @@ namespace Mist {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
char rtmpheader[] = {0, //byte 0 = cs_id | ch_type
|
char rtmpheader[] ={0, //byte 0 = cs_id | ch_type
|
||||||
0, 0, 0, //bytes 1-3 = timestamp
|
0, 0, 0, //bytes 1-3 = timestamp
|
||||||
0, 0, 0, //bytes 4-6 = length
|
0, 0, 0, //bytes 4-6 = length
|
||||||
0x12, //byte 7 = msg_type_id
|
0x12, //byte 7 = msg_type_id
|
||||||
1, 0, 0, 0, //bytes 8-11 = msg_stream_id = 1
|
1, 0, 0, 0, //bytes 8-11 = msg_stream_id = 1
|
||||||
0, 0, 0, 0}; //bytes 12-15 = extended timestamp
|
0, 0, 0, 0}; //bytes 12-15 = extended timestamp
|
||||||
char dataheader[] = {0, 0, 0, 0, 0};
|
char dataheader[] ={0, 0, 0, 0, 0};
|
||||||
unsigned int dheader_len = 1;
|
unsigned int dheader_len = 1;
|
||||||
|
static char * swappyPointer = 0;
|
||||||
|
static uint32_t swappySize = 0;
|
||||||
char * tmpData = 0;//pointer to raw media data
|
char * tmpData = 0;//pointer to raw media data
|
||||||
unsigned int data_len = 0;//length of processed media data
|
unsigned int data_len = 0;//length of processed media data
|
||||||
thisPacket.getString("data", tmpData, data_len);
|
thisPacket.getString("data", tmpData, data_len);
|
||||||
|
@ -228,13 +230,29 @@ namespace Mist {
|
||||||
dataheader[0] |= 0x20;
|
dataheader[0] |= 0x20;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (track.codec == "ADPCM") {
|
if (track.codec == "ADPCM"){
|
||||||
dataheader[0] |= 0x10;
|
dataheader[0] |= 0x10;
|
||||||
}
|
}
|
||||||
if (track.codec == "PCM") {
|
if (track.codec == "PCM"){
|
||||||
|
if (track.size == 16){
|
||||||
|
if (swappySize < data_len){
|
||||||
|
char * tmp = (char*)realloc(swappyPointer, data_len);
|
||||||
|
if (!tmp){
|
||||||
|
FAIL_MSG("Could not allocate data for PCM endianness swap!");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
swappyPointer = tmp;
|
||||||
|
swappySize = data_len;
|
||||||
|
}
|
||||||
|
for (uint32_t i = 0; i < data_len; i+=2){
|
||||||
|
swappyPointer[i] = tmpData[i+1];
|
||||||
|
swappyPointer[i+1] = tmpData[i];
|
||||||
|
}
|
||||||
|
tmpData = swappyPointer;
|
||||||
|
}
|
||||||
dataheader[0] |= 0x30;
|
dataheader[0] |= 0x30;
|
||||||
}
|
}
|
||||||
if (track.codec == "Nellymoser") {
|
if (track.codec == "Nellymoser"){
|
||||||
if (track.rate == 8000){
|
if (track.rate == 8000){
|
||||||
dataheader[0] |= 0x50;
|
dataheader[0] |= 0x50;
|
||||||
}else if(track.rate == 16000){
|
}else if(track.rate == 16000){
|
||||||
|
@ -243,13 +261,13 @@ namespace Mist {
|
||||||
dataheader[0] |= 0x60;
|
dataheader[0] |= 0x60;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (track.codec == "ALAW") {
|
if (track.codec == "ALAW"){
|
||||||
dataheader[0] |= 0x70;
|
dataheader[0] |= 0x70;
|
||||||
}
|
}
|
||||||
if (track.codec == "ULAW") {
|
if (track.codec == "ULAW"){
|
||||||
dataheader[0] |= 0x80;
|
dataheader[0] |= 0x80;
|
||||||
}
|
}
|
||||||
if (track.codec == "Speex") {
|
if (track.codec == "Speex"){
|
||||||
dataheader[0] |= 0xB0;
|
dataheader[0] |= 0xB0;
|
||||||
}
|
}
|
||||||
if (track.rate >= 44100){
|
if (track.rate >= 44100){
|
||||||
|
@ -367,20 +385,20 @@ namespace Mist {
|
||||||
RTMPStream::snd_cnt += header_len + data_len + steps;
|
RTMPStream::snd_cnt += header_len + data_len + steps;
|
||||||
}
|
}
|
||||||
|
|
||||||
void OutRTMP::sendHeader() {
|
void OutRTMP::sendHeader(){
|
||||||
FLV::Tag tag;
|
FLV::Tag tag;
|
||||||
tag.DTSCMetaInit(myMeta, selectedTracks);
|
tag.DTSCMetaInit(myMeta, selectedTracks);
|
||||||
if (tag.len) {
|
if (tag.len){
|
||||||
myConn.SendNow(RTMPStream::SendMedia(tag));
|
myConn.SendNow(RTMPStream::SendMedia(tag));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (std::set<long unsigned int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++) {
|
for (std::set<long unsigned int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
|
||||||
if (myMeta.tracks[*it].type == "video") {
|
if (myMeta.tracks[*it].type == "video"){
|
||||||
if (tag.DTSCVideoInit(myMeta.tracks[*it])){
|
if (tag.DTSCVideoInit(myMeta.tracks[*it])){
|
||||||
myConn.SendNow(RTMPStream::SendMedia(tag));
|
myConn.SendNow(RTMPStream::SendMedia(tag));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (myMeta.tracks[*it].type == "audio") {
|
if (myMeta.tracks[*it].type == "audio"){
|
||||||
if (tag.DTSCAudioInit(myMeta.tracks[*it])){
|
if (tag.DTSCAudioInit(myMeta.tracks[*it])){
|
||||||
myConn.SendNow(RTMPStream::SendMedia(tag));
|
myConn.SendNow(RTMPStream::SendMedia(tag));
|
||||||
}
|
}
|
||||||
|
@ -397,14 +415,14 @@ namespace Mist {
|
||||||
///\param amfReply The data to be sent over RTMP.
|
///\param amfReply The data to be sent over RTMP.
|
||||||
///\param messageType The type of message.
|
///\param messageType The type of message.
|
||||||
///\param streamId The ID of the AMF stream.
|
///\param streamId The ID of the AMF stream.
|
||||||
void OutRTMP::sendCommand(AMF::Object & amfReply, int messageType, int streamId) {
|
void OutRTMP::sendCommand(AMF::Object & amfReply, int messageType, int streamId){
|
||||||
HIGH_MSG("Sending: %s", amfReply.Print().c_str());
|
HIGH_MSG("Sending: %s", amfReply.Print().c_str());
|
||||||
if (messageType == 17) {
|
if (messageType == 17){
|
||||||
myConn.SendNow(RTMPStream::SendChunk(3, messageType, streamId, (char)0 + amfReply.Pack()));
|
myConn.SendNow(RTMPStream::SendChunk(3, messageType, streamId, (char)0 + amfReply.Pack()));
|
||||||
} else {
|
}else{
|
||||||
myConn.SendNow(RTMPStream::SendChunk(3, messageType, streamId, amfReply.Pack()));
|
myConn.SendNow(RTMPStream::SendChunk(3, messageType, streamId, amfReply.Pack()));
|
||||||
}
|
}
|
||||||
} //sendCommand
|
}//sendCommand
|
||||||
|
|
||||||
///\brief Parses a single AMF command message, and sends a direct response through sendCommand().
|
///\brief Parses a single AMF command message, and sends a direct response through sendCommand().
|
||||||
///\param amfData The received request.
|
///\param amfData The received request.
|
||||||
|
@ -413,7 +431,7 @@ namespace Mist {
|
||||||
void OutRTMP::parseAMFCommand(AMF::Object & amfData, int messageType, int streamId) {
|
void OutRTMP::parseAMFCommand(AMF::Object & amfData, int messageType, int streamId) {
|
||||||
MEDIUM_MSG("Received command: %s", amfData.Print().c_str());
|
MEDIUM_MSG("Received command: %s", amfData.Print().c_str());
|
||||||
HIGH_MSG("AMF0 command: %s", amfData.getContentP(0)->StrValue().c_str());
|
HIGH_MSG("AMF0 command: %s", amfData.getContentP(0)->StrValue().c_str());
|
||||||
if (amfData.getContentP(0)->StrValue() == "xsbwtest") {
|
if (amfData.getContentP(0)->StrValue() == "xsbwtest"){
|
||||||
//send a _result reply
|
//send a _result reply
|
||||||
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
||||||
amfReply.addContent(AMF::Object("", "_error")); //result success
|
amfReply.addContent(AMF::Object("", "_error")); //result success
|
||||||
|
@ -423,9 +441,9 @@ namespace Mist {
|
||||||
sendCommand(amfReply, messageType, streamId);
|
sendCommand(amfReply, messageType, streamId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (amfData.getContentP(0)->StrValue() == "connect") {
|
if (amfData.getContentP(0)->StrValue() == "connect"){
|
||||||
double objencoding = 0;
|
double objencoding = 0;
|
||||||
if (amfData.getContentP(2)->getContentP("objectEncoding")) {
|
if (amfData.getContentP(2)->getContentP("objectEncoding")){
|
||||||
objencoding = amfData.getContentP(2)->getContentP("objectEncoding")->NumValue();
|
objencoding = amfData.getContentP(2)->getContentP("objectEncoding")->NumValue();
|
||||||
}
|
}
|
||||||
app_name = amfData.getContentP(2)->getContentP("tcUrl")->StrValue();
|
app_name = amfData.getContentP(2)->getContentP("tcUrl")->StrValue();
|
||||||
|
@ -459,8 +477,8 @@ namespace Mist {
|
||||||
//amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null
|
//amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null
|
||||||
//sendCommand(amfReply, messageType, streamId);
|
//sendCommand(amfReply, messageType, streamId);
|
||||||
return;
|
return;
|
||||||
} //connect
|
}//connect
|
||||||
if (amfData.getContentP(0)->StrValue() == "createStream") {
|
if (amfData.getContentP(0)->StrValue() == "createStream"){
|
||||||
//send a _result reply
|
//send a _result reply
|
||||||
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
||||||
amfReply.addContent(AMF::Object("", "_result")); //result success
|
amfReply.addContent(AMF::Object("", "_result")); //result success
|
||||||
|
@ -470,8 +488,8 @@ namespace Mist {
|
||||||
sendCommand(amfReply, messageType, streamId);
|
sendCommand(amfReply, messageType, streamId);
|
||||||
myConn.SendNow(RTMPStream::SendUSR(0, 1)); //send UCM StreamBegin (0), stream 1
|
myConn.SendNow(RTMPStream::SendUSR(0, 1)); //send UCM StreamBegin (0), stream 1
|
||||||
return;
|
return;
|
||||||
} //createStream
|
}//createStream
|
||||||
if (amfData.getContentP(0)->StrValue() == "ping") {
|
if (amfData.getContentP(0)->StrValue() == "ping"){
|
||||||
//send a _result reply
|
//send a _result reply
|
||||||
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
||||||
amfReply.addContent(AMF::Object("", "_result")); //result success
|
amfReply.addContent(AMF::Object("", "_result")); //result success
|
||||||
|
@ -480,7 +498,7 @@ namespace Mist {
|
||||||
amfReply.addContent(AMF::Object("", "Pong!")); //stream ID - we use 1
|
amfReply.addContent(AMF::Object("", "Pong!")); //stream ID - we use 1
|
||||||
sendCommand(amfReply, messageType, streamId);
|
sendCommand(amfReply, messageType, streamId);
|
||||||
return;
|
return;
|
||||||
} //createStream
|
}//createStream
|
||||||
if (amfData.getContentP(0)->StrValue() == "closeStream"){
|
if (amfData.getContentP(0)->StrValue() == "closeStream"){
|
||||||
myConn.SendNow(RTMPStream::SendUSR(1, 1)); //send UCM StreamEOF (1), stream 1
|
myConn.SendNow(RTMPStream::SendUSR(1, 1)); //send UCM StreamEOF (1), stream 1
|
||||||
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
|
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
|
||||||
|
@ -497,16 +515,16 @@ namespace Mist {
|
||||||
stop();
|
stop();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (amfData.getContentP(0)->StrValue() == "deleteStream") {
|
if (amfData.getContentP(0)->StrValue() == "deleteStream"){
|
||||||
stop();
|
stop();
|
||||||
onFinish();
|
onFinish();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if ((amfData.getContentP(0)->StrValue() == "FCUnpublish") || (amfData.getContentP(0)->StrValue() == "releaseStream")) {
|
if ((amfData.getContentP(0)->StrValue() == "FCUnpublish") || (amfData.getContentP(0)->StrValue() == "releaseStream")){
|
||||||
// ignored
|
// ignored
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if ((amfData.getContentP(0)->StrValue() == "FCSubscribe")) {
|
if ((amfData.getContentP(0)->StrValue() == "FCSubscribe")){
|
||||||
//send a FCPublish reply
|
//send a FCPublish reply
|
||||||
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
||||||
amfReply.addContent(AMF::Object("", "onFCSubscribe")); //status reply
|
amfReply.addContent(AMF::Object("", "onFCSubscribe")); //status reply
|
||||||
|
@ -518,8 +536,8 @@ namespace Mist {
|
||||||
amfReply.getContentP(3)->addContent(AMF::Object("description", "Please follow up with play or publish command, as we ignore this command."));
|
amfReply.getContentP(3)->addContent(AMF::Object("description", "Please follow up with play or publish command, as we ignore this command."));
|
||||||
sendCommand(amfReply, messageType, streamId);
|
sendCommand(amfReply, messageType, streamId);
|
||||||
return;
|
return;
|
||||||
} //FCPublish
|
}//FCPublish
|
||||||
if ((amfData.getContentP(0)->StrValue() == "FCPublish")) {
|
if ((amfData.getContentP(0)->StrValue() == "FCPublish")){
|
||||||
//send a FCPublish reply
|
//send a FCPublish reply
|
||||||
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
||||||
amfReply.addContent(AMF::Object("", "onFCPublish")); //status reply
|
amfReply.addContent(AMF::Object("", "onFCPublish")); //status reply
|
||||||
|
@ -530,8 +548,8 @@ namespace Mist {
|
||||||
amfReply.getContentP(3)->addContent(AMF::Object("description", "Please follow up with publish command, as we ignore this command."));
|
amfReply.getContentP(3)->addContent(AMF::Object("description", "Please follow up with publish command, as we ignore this command."));
|
||||||
sendCommand(amfReply, messageType, streamId);
|
sendCommand(amfReply, messageType, streamId);
|
||||||
return;
|
return;
|
||||||
} //FCPublish
|
}//FCPublish
|
||||||
if (amfData.getContentP(0)->StrValue() == "releaseStream") {
|
if (amfData.getContentP(0)->StrValue() == "releaseStream"){
|
||||||
//send a _result reply
|
//send a _result reply
|
||||||
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
||||||
amfReply.addContent(AMF::Object("", "_result")); //result success
|
amfReply.addContent(AMF::Object("", "_result")); //result success
|
||||||
|
@ -541,7 +559,7 @@ namespace Mist {
|
||||||
sendCommand(amfReply, messageType, streamId);
|
sendCommand(amfReply, messageType, streamId);
|
||||||
return;
|
return;
|
||||||
}//releaseStream
|
}//releaseStream
|
||||||
if ((amfData.getContentP(0)->StrValue() == "getStreamLength") || (amfData.getContentP(0)->StrValue() == "getMovLen")) {
|
if ((amfData.getContentP(0)->StrValue() == "getStreamLength") || (amfData.getContentP(0)->StrValue() == "getMovLen")){
|
||||||
//send a _result reply
|
//send a _result reply
|
||||||
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
||||||
amfReply.addContent(AMF::Object("", "_result")); //result success
|
amfReply.addContent(AMF::Object("", "_result")); //result success
|
||||||
|
@ -550,9 +568,9 @@ namespace Mist {
|
||||||
amfReply.addContent(AMF::Object("", (double)0)); //zero length
|
amfReply.addContent(AMF::Object("", (double)0)); //zero length
|
||||||
sendCommand(amfReply, messageType, streamId);
|
sendCommand(amfReply, messageType, streamId);
|
||||||
return;
|
return;
|
||||||
} //getStreamLength
|
}//getStreamLength
|
||||||
if ((amfData.getContentP(0)->StrValue() == "publish")) {
|
if ((amfData.getContentP(0)->StrValue() == "publish")){
|
||||||
if (amfData.getContentP(3)) {
|
if (amfData.getContentP(3)){
|
||||||
streamName = Encodings::URL::decode(amfData.getContentP(3)->StrValue());
|
streamName = Encodings::URL::decode(amfData.getContentP(3)->StrValue());
|
||||||
|
|
||||||
if (streamName.find('/')){
|
if (streamName.find('/')){
|
||||||
|
@ -596,8 +614,8 @@ namespace Mist {
|
||||||
amfReply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
amfReply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
||||||
sendCommand(amfReply, messageType, streamId);
|
sendCommand(amfReply, messageType, streamId);
|
||||||
return;
|
return;
|
||||||
} //getStreamLength
|
}//getStreamLength
|
||||||
if (amfData.getContentP(0)->StrValue() == "checkBandwidth") {
|
if (amfData.getContentP(0)->StrValue() == "checkBandwidth"){
|
||||||
//send a _result reply
|
//send a _result reply
|
||||||
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
||||||
amfReply.addContent(AMF::Object("", "_result")); //result success
|
amfReply.addContent(AMF::Object("", "_result")); //result success
|
||||||
|
@ -606,8 +624,11 @@ namespace Mist {
|
||||||
amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info
|
amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info
|
||||||
sendCommand(amfReply, messageType, streamId);
|
sendCommand(amfReply, messageType, streamId);
|
||||||
return;
|
return;
|
||||||
} //checkBandwidth
|
}//checkBandwidth
|
||||||
if ((amfData.getContentP(0)->StrValue() == "play") || (amfData.getContentP(0)->StrValue() == "play2")) {
|
if (amfData.getContentP(0)->StrValue() == "onBWDone"){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if ((amfData.getContentP(0)->StrValue() == "play") || (amfData.getContentP(0)->StrValue() == "play2")){
|
||||||
//set reply number and stream name, actual reply is sent up in the ss.spool() handler
|
//set reply number and stream name, actual reply is sent up in the ss.spool() handler
|
||||||
int playTransaction = amfData.getContentP(1)->NumValue();
|
int playTransaction = amfData.getContentP(1)->NumValue();
|
||||||
int playMessageType = messageType;
|
int playMessageType = messageType;
|
||||||
|
@ -646,7 +667,7 @@ namespace Mist {
|
||||||
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
||||||
sendCommand(amfreply, playMessageType, playStreamId);
|
sendCommand(amfreply, playMessageType, playStreamId);
|
||||||
//send streamisrecorded if stream, well, is recorded.
|
//send streamisrecorded if stream, well, is recorded.
|
||||||
if (myMeta.vod) { //isMember("length") && Strm.metadata["length"].asInt() > 0){
|
if (myMeta.vod){//isMember("length") && Strm.metadata["length"].asInt() > 0){
|
||||||
myConn.SendNow(RTMPStream::SendUSR(4, 1)); //send UCM StreamIsRecorded (4), stream 1
|
myConn.SendNow(RTMPStream::SendUSR(4, 1)); //send UCM StreamIsRecorded (4), stream 1
|
||||||
}
|
}
|
||||||
//send streambegin
|
//send streambegin
|
||||||
|
@ -673,8 +694,8 @@ namespace Mist {
|
||||||
|
|
||||||
parseData = true;
|
parseData = true;
|
||||||
return;
|
return;
|
||||||
} //play
|
}//play
|
||||||
if ((amfData.getContentP(0)->StrValue() == "seek")) {
|
if ((amfData.getContentP(0)->StrValue() == "seek")){
|
||||||
//set reply number and stream name, actual reply is sent up in the ss.spool() handler
|
//set reply number and stream name, actual reply is sent up in the ss.spool() handler
|
||||||
int playTransaction = amfData.getContentP(1)->NumValue();
|
int playTransaction = amfData.getContentP(1)->NumValue();
|
||||||
int playMessageType = messageType;
|
int playMessageType = messageType;
|
||||||
|
@ -706,7 +727,7 @@ namespace Mist {
|
||||||
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
||||||
sendCommand(amfreply, playMessageType, playStreamId);
|
sendCommand(amfreply, playMessageType, playStreamId);
|
||||||
//send streamisrecorded if stream, well, is recorded.
|
//send streamisrecorded if stream, well, is recorded.
|
||||||
if (myMeta.vod) { //isMember("length") && Strm.metadata["length"].asInt() > 0){
|
if (myMeta.vod){//isMember("length") && Strm.metadata["length"].asInt() > 0){
|
||||||
myConn.SendNow(RTMPStream::SendUSR(4, 1)); //send UCM StreamIsRecorded (4), stream 1
|
myConn.SendNow(RTMPStream::SendUSR(4, 1)); //send UCM StreamIsRecorded (4), stream 1
|
||||||
}
|
}
|
||||||
//send streambegin
|
//send streambegin
|
||||||
|
@ -733,11 +754,11 @@ namespace Mist {
|
||||||
myConn.SendNow(RTMPStream::SendUSR(32, 1)); //send UCM no clue?, stream 1
|
myConn.SendNow(RTMPStream::SendUSR(32, 1)); //send UCM no clue?, stream 1
|
||||||
|
|
||||||
return;
|
return;
|
||||||
} //seek
|
}//seek
|
||||||
if ((amfData.getContentP(0)->StrValue() == "pauseRaw") || (amfData.getContentP(0)->StrValue() == "pause")) {
|
if ((amfData.getContentP(0)->StrValue() == "pauseRaw") || (amfData.getContentP(0)->StrValue() == "pause")){
|
||||||
int playMessageType = messageType;
|
int playMessageType = messageType;
|
||||||
int playStreamId = streamId;
|
int playStreamId = streamId;
|
||||||
if (amfData.getContentP(3)->NumValue()) {
|
if (amfData.getContentP(3)->NumValue()){
|
||||||
parseData = false;
|
parseData = false;
|
||||||
//send a status reply
|
//send a status reply
|
||||||
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
||||||
|
@ -751,7 +772,7 @@ namespace Mist {
|
||||||
amfReply.getContentP(3)->addContent(AMF::Object("details", "DDV"));
|
amfReply.getContentP(3)->addContent(AMF::Object("details", "DDV"));
|
||||||
amfReply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
amfReply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
||||||
sendCommand(amfReply, playMessageType, playStreamId);
|
sendCommand(amfReply, playMessageType, playStreamId);
|
||||||
} else {
|
}else{
|
||||||
parseData = true;
|
parseData = true;
|
||||||
//send a status reply
|
//send a status reply
|
||||||
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
||||||
|
@ -767,13 +788,13 @@ namespace Mist {
|
||||||
sendCommand(amfReply, playMessageType, playStreamId);
|
sendCommand(amfReply, playMessageType, playStreamId);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
} //seek
|
}//seek
|
||||||
if (amfData.getContentP(0)->StrValue() == "_error") {
|
if (amfData.getContentP(0)->StrValue() == "_error"){
|
||||||
WARN_MSG("Received error response: %s", amfData.Print().c_str());
|
WARN_MSG("Received error response: %s", amfData.Print().c_str());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if ((amfData.getContentP(0)->StrValue() == "_result") || (amfData.getContentP(0)->StrValue() == "onFCPublish") || (amfData.getContentP(0)->StrValue() == "onStatus")) {
|
if ((amfData.getContentP(0)->StrValue() == "_result") || (amfData.getContentP(0)->StrValue() == "onFCPublish") || (amfData.getContentP(0)->StrValue() == "onStatus")){
|
||||||
//Results are ignored. We don't really care.
|
//Other results are ignored. We don't really care.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -785,11 +806,11 @@ namespace Mist {
|
||||||
amfReply.addContent(AMF::Object("", amfData.getContentP(0)->StrValue())); //null - command info
|
amfReply.addContent(AMF::Object("", amfData.getContentP(0)->StrValue())); //null - command info
|
||||||
amfReply.addContent(AMF::Object("", "Command not implemented or recognized")); //stream ID?
|
amfReply.addContent(AMF::Object("", "Command not implemented or recognized")); //stream ID?
|
||||||
sendCommand(amfReply, messageType, streamId);
|
sendCommand(amfReply, messageType, streamId);
|
||||||
} //parseAMFCommand
|
}//parseAMFCommand
|
||||||
|
|
||||||
///\brief Gets and parses one RTMP chunk at a time.
|
///\brief Gets and parses one RTMP chunk at a time.
|
||||||
///\param inputBuffer A buffer filled with chunk data.
|
///\param inputBuffer A buffer filled with chunk data.
|
||||||
void OutRTMP::parseChunk(Socket::Buffer & inputBuffer) {
|
void OutRTMP::parseChunk(Socket::Buffer & inputBuffer){
|
||||||
//for DTSC conversion
|
//for DTSC conversion
|
||||||
static std::stringstream prebuffer; // Temporary buffer before sending real data
|
static std::stringstream prebuffer; // Temporary buffer before sending real data
|
||||||
//for chunk parsing
|
//for chunk parsing
|
||||||
|
@ -800,18 +821,18 @@ namespace Mist {
|
||||||
static AMF::Object3 amf3data("empty", AMF::AMF3_DDV_CONTAINER);
|
static AMF::Object3 amf3data("empty", AMF::AMF3_DDV_CONTAINER);
|
||||||
static AMF::Object3 amf3elem("empty", AMF::AMF3_DDV_CONTAINER);
|
static AMF::Object3 amf3elem("empty", AMF::AMF3_DDV_CONTAINER);
|
||||||
|
|
||||||
while (next.Parse(inputBuffer)) {
|
while (next.Parse(inputBuffer)){
|
||||||
|
|
||||||
//send ACK if we received a whole window
|
//send ACK if we received a whole window
|
||||||
if ((RTMPStream::rec_cnt - RTMPStream::rec_window_at > RTMPStream::rec_window_size)) {
|
if ((RTMPStream::rec_cnt - RTMPStream::rec_window_at > RTMPStream::rec_window_size)){
|
||||||
RTMPStream::rec_window_at = RTMPStream::rec_cnt;
|
RTMPStream::rec_window_at = RTMPStream::rec_cnt;
|
||||||
myConn.SendNow(RTMPStream::SendCTL(3, RTMPStream::rec_cnt)); //send ack (msg 3)
|
myConn.SendNow(RTMPStream::SendCTL(3, RTMPStream::rec_cnt)); //send ack (msg 3)
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (next.msg_type_id) {
|
switch (next.msg_type_id){
|
||||||
case 0: //does not exist
|
case 0: //does not exist
|
||||||
WARN_MSG("UNKN: Received a zero-type message. Possible data corruption? Aborting!");
|
WARN_MSG("UNKN: Received a zero-type message. Possible data corruption? Aborting!");
|
||||||
while (inputBuffer.size()) {
|
while (inputBuffer.size()){
|
||||||
inputBuffer.get().clear();
|
inputBuffer.get().clear();
|
||||||
}
|
}
|
||||||
stop();
|
stop();
|
||||||
|
@ -830,7 +851,7 @@ namespace Mist {
|
||||||
RTMPStream::snd_window_at = ntohl(*(int *)next.data.c_str());
|
RTMPStream::snd_window_at = ntohl(*(int *)next.data.c_str());
|
||||||
RTMPStream::snd_window_at = RTMPStream::snd_cnt;
|
RTMPStream::snd_window_at = RTMPStream::snd_cnt;
|
||||||
break;
|
break;
|
||||||
case 4: {
|
case 4:{
|
||||||
//2 bytes event type, rest = event data
|
//2 bytes event type, rest = event data
|
||||||
//types:
|
//types:
|
||||||
//0 = stream begin, 4 bytes ID
|
//0 = stream begin, 4 bytes ID
|
||||||
|
@ -842,7 +863,7 @@ namespace Mist {
|
||||||
//7 = pingresponse, 4 bytes data
|
//7 = pingresponse, 4 bytes data
|
||||||
//we don't need to process this
|
//we don't need to process this
|
||||||
short int ucmtype = ntohs(*(short int *)next.data.c_str());
|
short int ucmtype = ntohs(*(short int *)next.data.c_str());
|
||||||
switch (ucmtype) {
|
switch (ucmtype){
|
||||||
case 0:
|
case 0:
|
||||||
MEDIUM_MSG("CTRL: UCM StreamBegin %i", ntohl(*((int *)(next.data.c_str() + 2))));
|
MEDIUM_MSG("CTRL: UCM StreamBegin %i", ntohl(*((int *)(next.data.c_str() + 2))));
|
||||||
break;
|
break;
|
||||||
|
@ -884,10 +905,10 @@ namespace Mist {
|
||||||
break;
|
break;
|
||||||
case 8: //audio data
|
case 8: //audio data
|
||||||
case 9: //video data
|
case 9: //video data
|
||||||
case 18: {//meta data
|
case 18:{//meta data
|
||||||
static std::map<unsigned int, AMF::Object> pushMeta;
|
static std::map<unsigned int, AMF::Object> pushMeta;
|
||||||
static std::map<uint64_t, uint64_t> lastTagTime;
|
static std::map<uint64_t, uint64_t> lastTagTime;
|
||||||
if (!isInitialized) {
|
if (!isInitialized){
|
||||||
MEDIUM_MSG("Received useless media data");
|
MEDIUM_MSG("Received useless media data");
|
||||||
onFinish();
|
onFinish();
|
||||||
break;
|
break;
|
||||||
|
@ -920,6 +941,15 @@ namespace Mist {
|
||||||
onFinish();
|
onFinish();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if (myMeta.tracks[reTrack].codec == "PCM" && myMeta.tracks[reTrack].size == 16){
|
||||||
|
char * ptr = F.getData();
|
||||||
|
uint32_t ptrSize = F.getDataLen();
|
||||||
|
for (uint32_t i = 0; i < ptrSize; i+=2){
|
||||||
|
char tmpchar = ptr[i];
|
||||||
|
ptr[i] = ptr[i+1];
|
||||||
|
ptr[i+1] = tmpchar;
|
||||||
|
}
|
||||||
|
}
|
||||||
thisPacket.genericFill(tagTime, F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe);
|
thisPacket.genericFill(tagTime, F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe);
|
||||||
ltt = tagTime;
|
ltt = tagTime;
|
||||||
if (!nProxy.userClient.getData()){
|
if (!nProxy.userClient.getData()){
|
||||||
|
@ -937,24 +967,24 @@ namespace Mist {
|
||||||
case 16:
|
case 16:
|
||||||
MEDIUM_MSG("Received AMF3 shared object");
|
MEDIUM_MSG("Received AMF3 shared object");
|
||||||
break;
|
break;
|
||||||
case 17: {
|
case 17:{
|
||||||
MEDIUM_MSG("Received AMF3 command message");
|
MEDIUM_MSG("Received AMF3 command message");
|
||||||
if (next.data[0] != 0) {
|
if (next.data[0] != 0){
|
||||||
next.data = next.data.substr(1);
|
next.data = next.data.substr(1);
|
||||||
amf3data = AMF::parse3(next.data);
|
amf3data = AMF::parse3(next.data);
|
||||||
MEDIUM_MSG("AMF3: %s", amf3data.Print().c_str());
|
MEDIUM_MSG("AMF3: %s", amf3data.Print().c_str());
|
||||||
} else {
|
}else{
|
||||||
MEDIUM_MSG("Received AMF3-0 command message");
|
MEDIUM_MSG("Received AMF3-0 command message");
|
||||||
next.data = next.data.substr(1);
|
next.data = next.data.substr(1);
|
||||||
amfdata = AMF::parse(next.data);
|
amfdata = AMF::parse(next.data);
|
||||||
parseAMFCommand(amfdata, 17, next.msg_stream_id);
|
parseAMFCommand(amfdata, 17, next.msg_stream_id);
|
||||||
} //parsing AMF0-style
|
}//parsing AMF0-style
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case 19:
|
case 19:
|
||||||
MEDIUM_MSG("Received AMF0 shared object");
|
MEDIUM_MSG("Received AMF0 shared object");
|
||||||
break;
|
break;
|
||||||
case 20: { //AMF0 command message
|
case 20:{//AMF0 command message
|
||||||
amfdata = AMF::parse(next.data);
|
amfdata = AMF::parse(next.data);
|
||||||
parseAMFCommand(amfdata, 20, next.msg_stream_id);
|
parseAMFCommand(amfdata, 20, next.msg_stream_id);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue