From cef78b7d086613e89585672f0315106c402a7539 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 19 Jun 2017 12:38:35 +0200 Subject: [PATCH] PCM support in Flash-based protocols --- lib/flv_tag.cpp | 6 +- src/input/input_flv.cpp | 12 ++ src/output/output_hds.cpp | 12 +- src/output/output_progressive_flv.cpp | 12 +- src/output/output_rtmp.cpp | 168 +++++++++++++++----------- 5 files changed, 138 insertions(+), 72 deletions(-) diff --git a/lib/flv_tag.cpp b/lib/flv_tag.cpp index a6ab0904..380539c8 100644 --- a/lib/flv_tag.cpp +++ b/lib/flv_tag.cpp @@ -182,7 +182,11 @@ const char * FLV::Tag::getVideoCodec() { const char * FLV::Tag::getAudioCodec() { switch (data[11] & 0xF0) { case 0x00: - return "PCMPE"; + if (data[11] & 0x02){ + return "PCMPE";//unknown endianness + }else{ + return "PCM";//8 bit is always regular PCM + } case 0x10: return "ADPCM"; case 0x20: diff --git a/src/input/input_flv.cpp b/src/input/input_flv.cpp index 66ffa930..01163471 100644 --- a/src/input/input_flv.cpp +++ b/src/input/input_flv.cpp @@ -132,6 +132,18 @@ namespace Mist { return getNext(); } thisPacket.genericFill(tmpTag.tagTime(), tmpTag.offset(), tmpTag.getTrackID(), tmpTag.getData(), tmpTag.getDataLen(), lastBytePos, tmpTag.isKeyframe); //init packet from tmpTags data + + DTSC::Track & trk = myMeta.tracks[tmpTag.getTrackID()]; + if (trk.codec == "PCM" && trk.size == 16){ + char * ptr = 0; + uint32_t ptrSize = 0; + thisPacket.getString("data", ptr, ptrSize); + for (uint32_t i = 0; i < ptrSize; i+=2){ + char tmpchar = ptr[i]; + ptr[i] = ptr[i+1]; + ptr[i+1] = tmpchar; + } + } } void inputFLV::seek(int seekTime) { diff --git a/src/output/output_hds.cpp b/src/output/output_hds.cpp index d88cd025..b786dc15 100644 --- a/src/output/output_hds.cpp +++ b/src/output/output_hds.cpp @@ -172,7 +172,17 @@ namespace Mist { H.Chunkify("", 0, myConn); return; } - tag.DTSCLoader(thisPacket, myMeta.tracks[thisPacket.getTrackId()]); + DTSC::Track & trk = myMeta.tracks[thisPacket.getTrackId()]; + tag.DTSCLoader(thisPacket, trk); + if (trk.codec == "PCM" && trk.size == 16){ + char * ptr = tag.getData(); + uint32_t ptrSize = tag.getDataLen(); + for (uint32_t i = 0; i < ptrSize; i+=2){ + char tmpchar = ptr[i]; + ptr[i] = ptr[i+1]; + ptr[i+1] = tmpchar; + } + } if (tag.len){ H.Chunkify(tag.data, tag.len, myConn); } diff --git a/src/output/output_progressive_flv.cpp b/src/output/output_progressive_flv.cpp index 49dc174a..a2004e87 100644 --- a/src/output/output_progressive_flv.cpp +++ b/src/output/output_progressive_flv.cpp @@ -31,7 +31,17 @@ namespace Mist { } void OutProgressiveFLV::sendNext(){ - tag.DTSCLoader(thisPacket, myMeta.tracks[thisPacket.getTrackId()]); + DTSC::Track & trk = myMeta.tracks[thisPacket.getTrackId()]; + tag.DTSCLoader(thisPacket, trk); + if (trk.codec == "PCM" && trk.size == 16){ + char * ptr = tag.getData(); + uint32_t ptrSize = tag.getDataLen(); + for (uint32_t i = 0; i < ptrSize; i+=2){ + char tmpchar = ptr[i]; + ptr[i] = ptr[i+1]; + ptr[i+1] = tmpchar; + } + } myConn.SendNow(tag.data, tag.len); } diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index 2678fa99..46144d63 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -121,7 +121,7 @@ namespace Mist { } - void OutRTMP::init(Util::Config * cfg) { + void OutRTMP::init(Util::Config * cfg){ Output::init(cfg); capa["name"] = "RTMP"; capa["desc"] = "Enables ingest and output over Adobe's RTMP protocol."; @@ -150,7 +150,7 @@ namespace Mist { config = cfg; } - void OutRTMP::sendNext() { + void OutRTMP::sendNext(){ //If there are now more selectable tracks, select the new track and do a seek to the current timestamp //Set sentHeader to false to force it to send init data @@ -173,14 +173,16 @@ namespace Mist { } - char rtmpheader[] = {0, //byte 0 = cs_id | ch_type + char rtmpheader[] ={0, //byte 0 = cs_id | ch_type 0, 0, 0, //bytes 1-3 = timestamp 0, 0, 0, //bytes 4-6 = length 0x12, //byte 7 = msg_type_id 1, 0, 0, 0, //bytes 8-11 = msg_stream_id = 1 0, 0, 0, 0}; //bytes 12-15 = extended timestamp - char dataheader[] = {0, 0, 0, 0, 0}; + char dataheader[] ={0, 0, 0, 0, 0}; unsigned int dheader_len = 1; + static char * swappyPointer = 0; + static uint32_t swappySize = 0; char * tmpData = 0;//pointer to raw media data unsigned int data_len = 0;//length of processed media data thisPacket.getString("data", tmpData, data_len); @@ -228,13 +230,29 @@ namespace Mist { dataheader[0] |= 0x20; } } - if (track.codec == "ADPCM") { + if (track.codec == "ADPCM"){ dataheader[0] |= 0x10; } - if (track.codec == "PCM") { + if (track.codec == "PCM"){ + if (track.size == 16){ + if (swappySize < data_len){ + char * tmp = (char*)realloc(swappyPointer, data_len); + if (!tmp){ + FAIL_MSG("Could not allocate data for PCM endianness swap!"); + return; + } + swappyPointer = tmp; + swappySize = data_len; + } + for (uint32_t i = 0; i < data_len; i+=2){ + swappyPointer[i] = tmpData[i+1]; + swappyPointer[i+1] = tmpData[i]; + } + tmpData = swappyPointer; + } dataheader[0] |= 0x30; } - if (track.codec == "Nellymoser") { + if (track.codec == "Nellymoser"){ if (track.rate == 8000){ dataheader[0] |= 0x50; }else if(track.rate == 16000){ @@ -243,13 +261,13 @@ namespace Mist { dataheader[0] |= 0x60; } } - if (track.codec == "ALAW") { + if (track.codec == "ALAW"){ dataheader[0] |= 0x70; } - if (track.codec == "ULAW") { + if (track.codec == "ULAW"){ dataheader[0] |= 0x80; } - if (track.codec == "Speex") { + if (track.codec == "Speex"){ dataheader[0] |= 0xB0; } if (track.rate >= 44100){ @@ -367,20 +385,20 @@ namespace Mist { RTMPStream::snd_cnt += header_len + data_len + steps; } - void OutRTMP::sendHeader() { + void OutRTMP::sendHeader(){ FLV::Tag tag; tag.DTSCMetaInit(myMeta, selectedTracks); - if (tag.len) { + if (tag.len){ myConn.SendNow(RTMPStream::SendMedia(tag)); } - for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++) { - if (myMeta.tracks[*it].type == "video") { + for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ + if (myMeta.tracks[*it].type == "video"){ if (tag.DTSCVideoInit(myMeta.tracks[*it])){ myConn.SendNow(RTMPStream::SendMedia(tag)); } } - if (myMeta.tracks[*it].type == "audio") { + if (myMeta.tracks[*it].type == "audio"){ if (tag.DTSCAudioInit(myMeta.tracks[*it])){ myConn.SendNow(RTMPStream::SendMedia(tag)); } @@ -397,14 +415,14 @@ namespace Mist { ///\param amfReply The data to be sent over RTMP. ///\param messageType The type of message. ///\param streamId The ID of the AMF stream. - void OutRTMP::sendCommand(AMF::Object & amfReply, int messageType, int streamId) { + void OutRTMP::sendCommand(AMF::Object & amfReply, int messageType, int streamId){ HIGH_MSG("Sending: %s", amfReply.Print().c_str()); - if (messageType == 17) { + if (messageType == 17){ myConn.SendNow(RTMPStream::SendChunk(3, messageType, streamId, (char)0 + amfReply.Pack())); - } else { + }else{ myConn.SendNow(RTMPStream::SendChunk(3, messageType, streamId, amfReply.Pack())); } - } //sendCommand + }//sendCommand ///\brief Parses a single AMF command message, and sends a direct response through sendCommand(). ///\param amfData The received request. @@ -413,7 +431,7 @@ namespace Mist { void OutRTMP::parseAMFCommand(AMF::Object & amfData, int messageType, int streamId) { MEDIUM_MSG("Received command: %s", amfData.Print().c_str()); HIGH_MSG("AMF0 command: %s", amfData.getContentP(0)->StrValue().c_str()); - if (amfData.getContentP(0)->StrValue() == "xsbwtest") { + if (amfData.getContentP(0)->StrValue() == "xsbwtest"){ //send a _result reply AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); amfReply.addContent(AMF::Object("", "_error")); //result success @@ -423,9 +441,9 @@ namespace Mist { sendCommand(amfReply, messageType, streamId); return; } - if (amfData.getContentP(0)->StrValue() == "connect") { + if (amfData.getContentP(0)->StrValue() == "connect"){ double objencoding = 0; - if (amfData.getContentP(2)->getContentP("objectEncoding")) { + if (amfData.getContentP(2)->getContentP("objectEncoding")){ objencoding = amfData.getContentP(2)->getContentP("objectEncoding")->NumValue(); } app_name = amfData.getContentP(2)->getContentP("tcUrl")->StrValue(); @@ -459,8 +477,8 @@ namespace Mist { //amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null //sendCommand(amfReply, messageType, streamId); return; - } //connect - if (amfData.getContentP(0)->StrValue() == "createStream") { + }//connect + if (amfData.getContentP(0)->StrValue() == "createStream"){ //send a _result reply AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); amfReply.addContent(AMF::Object("", "_result")); //result success @@ -470,8 +488,8 @@ namespace Mist { sendCommand(amfReply, messageType, streamId); myConn.SendNow(RTMPStream::SendUSR(0, 1)); //send UCM StreamBegin (0), stream 1 return; - } //createStream - if (amfData.getContentP(0)->StrValue() == "ping") { + }//createStream + if (amfData.getContentP(0)->StrValue() == "ping"){ //send a _result reply AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); amfReply.addContent(AMF::Object("", "_result")); //result success @@ -480,7 +498,7 @@ namespace Mist { amfReply.addContent(AMF::Object("", "Pong!")); //stream ID - we use 1 sendCommand(amfReply, messageType, streamId); return; - } //createStream + }//createStream if (amfData.getContentP(0)->StrValue() == "closeStream"){ myConn.SendNow(RTMPStream::SendUSR(1, 1)); //send UCM StreamEOF (1), stream 1 AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER); @@ -497,16 +515,16 @@ namespace Mist { stop(); return; } - if (amfData.getContentP(0)->StrValue() == "deleteStream") { + if (amfData.getContentP(0)->StrValue() == "deleteStream"){ stop(); onFinish(); return; } - if ((amfData.getContentP(0)->StrValue() == "FCUnpublish") || (amfData.getContentP(0)->StrValue() == "releaseStream")) { + if ((amfData.getContentP(0)->StrValue() == "FCUnpublish") || (amfData.getContentP(0)->StrValue() == "releaseStream")){ // ignored return; } - if ((amfData.getContentP(0)->StrValue() == "FCSubscribe")) { + if ((amfData.getContentP(0)->StrValue() == "FCSubscribe")){ //send a FCPublish reply AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); amfReply.addContent(AMF::Object("", "onFCSubscribe")); //status reply @@ -518,8 +536,8 @@ namespace Mist { amfReply.getContentP(3)->addContent(AMF::Object("description", "Please follow up with play or publish command, as we ignore this command.")); sendCommand(amfReply, messageType, streamId); return; - } //FCPublish - if ((amfData.getContentP(0)->StrValue() == "FCPublish")) { + }//FCPublish + if ((amfData.getContentP(0)->StrValue() == "FCPublish")){ //send a FCPublish reply AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); amfReply.addContent(AMF::Object("", "onFCPublish")); //status reply @@ -530,8 +548,8 @@ namespace Mist { amfReply.getContentP(3)->addContent(AMF::Object("description", "Please follow up with publish command, as we ignore this command.")); sendCommand(amfReply, messageType, streamId); return; - } //FCPublish - if (amfData.getContentP(0)->StrValue() == "releaseStream") { + }//FCPublish + if (amfData.getContentP(0)->StrValue() == "releaseStream"){ //send a _result reply AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); amfReply.addContent(AMF::Object("", "_result")); //result success @@ -541,7 +559,7 @@ namespace Mist { sendCommand(amfReply, messageType, streamId); return; }//releaseStream - if ((amfData.getContentP(0)->StrValue() == "getStreamLength") || (amfData.getContentP(0)->StrValue() == "getMovLen")) { + if ((amfData.getContentP(0)->StrValue() == "getStreamLength") || (amfData.getContentP(0)->StrValue() == "getMovLen")){ //send a _result reply AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); amfReply.addContent(AMF::Object("", "_result")); //result success @@ -550,9 +568,9 @@ namespace Mist { amfReply.addContent(AMF::Object("", (double)0)); //zero length sendCommand(amfReply, messageType, streamId); return; - } //getStreamLength - if ((amfData.getContentP(0)->StrValue() == "publish")) { - if (amfData.getContentP(3)) { + }//getStreamLength + if ((amfData.getContentP(0)->StrValue() == "publish")){ + if (amfData.getContentP(3)){ streamName = Encodings::URL::decode(amfData.getContentP(3)->StrValue()); if (streamName.find('/')){ @@ -596,8 +614,8 @@ namespace Mist { amfReply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); sendCommand(amfReply, messageType, streamId); return; - } //getStreamLength - if (amfData.getContentP(0)->StrValue() == "checkBandwidth") { + }//getStreamLength + if (amfData.getContentP(0)->StrValue() == "checkBandwidth"){ //send a _result reply AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); amfReply.addContent(AMF::Object("", "_result")); //result success @@ -606,8 +624,11 @@ namespace Mist { amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info sendCommand(amfReply, messageType, streamId); return; - } //checkBandwidth - if ((amfData.getContentP(0)->StrValue() == "play") || (amfData.getContentP(0)->StrValue() == "play2")) { + }//checkBandwidth + if (amfData.getContentP(0)->StrValue() == "onBWDone"){ + return; + } + if ((amfData.getContentP(0)->StrValue() == "play") || (amfData.getContentP(0)->StrValue() == "play2")){ //set reply number and stream name, actual reply is sent up in the ss.spool() handler int playTransaction = amfData.getContentP(1)->NumValue(); int playMessageType = messageType; @@ -646,7 +667,7 @@ namespace Mist { amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); sendCommand(amfreply, playMessageType, playStreamId); //send streamisrecorded if stream, well, is recorded. - if (myMeta.vod) { //isMember("length") && Strm.metadata["length"].asInt() > 0){ + if (myMeta.vod){//isMember("length") && Strm.metadata["length"].asInt() > 0){ myConn.SendNow(RTMPStream::SendUSR(4, 1)); //send UCM StreamIsRecorded (4), stream 1 } //send streambegin @@ -673,8 +694,8 @@ namespace Mist { parseData = true; return; - } //play - if ((amfData.getContentP(0)->StrValue() == "seek")) { + }//play + if ((amfData.getContentP(0)->StrValue() == "seek")){ //set reply number and stream name, actual reply is sent up in the ss.spool() handler int playTransaction = amfData.getContentP(1)->NumValue(); int playMessageType = messageType; @@ -706,7 +727,7 @@ namespace Mist { amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); sendCommand(amfreply, playMessageType, playStreamId); //send streamisrecorded if stream, well, is recorded. - if (myMeta.vod) { //isMember("length") && Strm.metadata["length"].asInt() > 0){ + if (myMeta.vod){//isMember("length") && Strm.metadata["length"].asInt() > 0){ myConn.SendNow(RTMPStream::SendUSR(4, 1)); //send UCM StreamIsRecorded (4), stream 1 } //send streambegin @@ -733,11 +754,11 @@ namespace Mist { myConn.SendNow(RTMPStream::SendUSR(32, 1)); //send UCM no clue?, stream 1 return; - } //seek - if ((amfData.getContentP(0)->StrValue() == "pauseRaw") || (amfData.getContentP(0)->StrValue() == "pause")) { + }//seek + if ((amfData.getContentP(0)->StrValue() == "pauseRaw") || (amfData.getContentP(0)->StrValue() == "pause")){ int playMessageType = messageType; int playStreamId = streamId; - if (amfData.getContentP(3)->NumValue()) { + if (amfData.getContentP(3)->NumValue()){ parseData = false; //send a status reply AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); @@ -751,7 +772,7 @@ namespace Mist { amfReply.getContentP(3)->addContent(AMF::Object("details", "DDV")); amfReply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); sendCommand(amfReply, playMessageType, playStreamId); - } else { + }else{ parseData = true; //send a status reply AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER); @@ -767,13 +788,13 @@ namespace Mist { sendCommand(amfReply, playMessageType, playStreamId); } return; - } //seek - if (amfData.getContentP(0)->StrValue() == "_error") { + }//seek + if (amfData.getContentP(0)->StrValue() == "_error"){ WARN_MSG("Received error response: %s", amfData.Print().c_str()); return; } - if ((amfData.getContentP(0)->StrValue() == "_result") || (amfData.getContentP(0)->StrValue() == "onFCPublish") || (amfData.getContentP(0)->StrValue() == "onStatus")) { - //Results are ignored. We don't really care. + if ((amfData.getContentP(0)->StrValue() == "_result") || (amfData.getContentP(0)->StrValue() == "onFCPublish") || (amfData.getContentP(0)->StrValue() == "onStatus")){ + //Other results are ignored. We don't really care. return; } @@ -785,11 +806,11 @@ namespace Mist { amfReply.addContent(AMF::Object("", amfData.getContentP(0)->StrValue())); //null - command info amfReply.addContent(AMF::Object("", "Command not implemented or recognized")); //stream ID? sendCommand(amfReply, messageType, streamId); - } //parseAMFCommand + }//parseAMFCommand ///\brief Gets and parses one RTMP chunk at a time. ///\param inputBuffer A buffer filled with chunk data. - void OutRTMP::parseChunk(Socket::Buffer & inputBuffer) { + void OutRTMP::parseChunk(Socket::Buffer & inputBuffer){ //for DTSC conversion static std::stringstream prebuffer; // Temporary buffer before sending real data //for chunk parsing @@ -800,18 +821,18 @@ namespace Mist { static AMF::Object3 amf3data("empty", AMF::AMF3_DDV_CONTAINER); static AMF::Object3 amf3elem("empty", AMF::AMF3_DDV_CONTAINER); - while (next.Parse(inputBuffer)) { + while (next.Parse(inputBuffer)){ //send ACK if we received a whole window - if ((RTMPStream::rec_cnt - RTMPStream::rec_window_at > RTMPStream::rec_window_size)) { + if ((RTMPStream::rec_cnt - RTMPStream::rec_window_at > RTMPStream::rec_window_size)){ RTMPStream::rec_window_at = RTMPStream::rec_cnt; myConn.SendNow(RTMPStream::SendCTL(3, RTMPStream::rec_cnt)); //send ack (msg 3) } - switch (next.msg_type_id) { + switch (next.msg_type_id){ case 0: //does not exist WARN_MSG("UNKN: Received a zero-type message. Possible data corruption? Aborting!"); - while (inputBuffer.size()) { + while (inputBuffer.size()){ inputBuffer.get().clear(); } stop(); @@ -830,7 +851,7 @@ namespace Mist { RTMPStream::snd_window_at = ntohl(*(int *)next.data.c_str()); RTMPStream::snd_window_at = RTMPStream::snd_cnt; break; - case 4: { + case 4:{ //2 bytes event type, rest = event data //types: //0 = stream begin, 4 bytes ID @@ -842,7 +863,7 @@ namespace Mist { //7 = pingresponse, 4 bytes data //we don't need to process this short int ucmtype = ntohs(*(short int *)next.data.c_str()); - switch (ucmtype) { + switch (ucmtype){ case 0: MEDIUM_MSG("CTRL: UCM StreamBegin %i", ntohl(*((int *)(next.data.c_str() + 2)))); break; @@ -884,10 +905,10 @@ namespace Mist { break; case 8: //audio data case 9: //video data - case 18: {//meta data + case 18:{//meta data static std::map pushMeta; static std::map lastTagTime; - if (!isInitialized) { + if (!isInitialized){ MEDIUM_MSG("Received useless media data"); onFinish(); break; @@ -920,6 +941,15 @@ namespace Mist { onFinish(); break; } + if (myMeta.tracks[reTrack].codec == "PCM" && myMeta.tracks[reTrack].size == 16){ + char * ptr = F.getData(); + uint32_t ptrSize = F.getDataLen(); + for (uint32_t i = 0; i < ptrSize; i+=2){ + char tmpchar = ptr[i]; + ptr[i] = ptr[i+1]; + ptr[i+1] = tmpchar; + } + } thisPacket.genericFill(tagTime, F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe); ltt = tagTime; if (!nProxy.userClient.getData()){ @@ -937,24 +967,24 @@ namespace Mist { case 16: MEDIUM_MSG("Received AMF3 shared object"); break; - case 17: { + case 17:{ MEDIUM_MSG("Received AMF3 command message"); - if (next.data[0] != 0) { + if (next.data[0] != 0){ next.data = next.data.substr(1); amf3data = AMF::parse3(next.data); MEDIUM_MSG("AMF3: %s", amf3data.Print().c_str()); - } else { + }else{ MEDIUM_MSG("Received AMF3-0 command message"); next.data = next.data.substr(1); amfdata = AMF::parse(next.data); parseAMFCommand(amfdata, 17, next.msg_stream_id); - } //parsing AMF0-style + }//parsing AMF0-style } break; case 19: MEDIUM_MSG("Received AMF0 shared object"); break; - case 20: { //AMF0 command message + case 20:{//AMF0 command message amfdata = AMF::parse(next.data); parseAMFCommand(amfdata, 20, next.msg_stream_id); }