Merge branch 'development' into LTS_development

# Conflicts:
#	src/output/output_progressive_flv.cpp
#	src/output/output_rtmp.cpp
This commit is contained in:
Thulinma 2017-06-19 12:46:41 +02:00
commit de4fea643a
5 changed files with 134 additions and 71 deletions

View file

@ -182,7 +182,11 @@ const char * FLV::Tag::getVideoCodec() {
const char * FLV::Tag::getAudioCodec() {
switch (data[11] & 0xF0) {
case 0x00:
return "PCMPE";
if (data[11] & 0x02){
return "PCMPE";//unknown endianness
}else{
return "PCM";//8 bit is always regular PCM
}
case 0x10:
return "ADPCM";
case 0x20:

View file

@ -132,6 +132,18 @@ namespace Mist {
return getNext();
}
thisPacket.genericFill(tmpTag.tagTime(), tmpTag.offset(), tmpTag.getTrackID(), tmpTag.getData(), tmpTag.getDataLen(), lastBytePos, tmpTag.isKeyframe); //init packet from tmpTags data
DTSC::Track & trk = myMeta.tracks[tmpTag.getTrackID()];
if (trk.codec == "PCM" && trk.size == 16){
char * ptr = 0;
uint32_t ptrSize = 0;
thisPacket.getString("data", ptr, ptrSize);
for (uint32_t i = 0; i < ptrSize; i+=2){
char tmpchar = ptr[i];
ptr[i] = ptr[i+1];
ptr[i+1] = tmpchar;
}
}
}
void inputFLV::seek(int seekTime) {

View file

@ -174,7 +174,17 @@ namespace Mist {
H.Chunkify("", 0, myConn);
return;
}
tag.DTSCLoader(thisPacket, myMeta.tracks[thisPacket.getTrackId()]);
DTSC::Track & trk = myMeta.tracks[thisPacket.getTrackId()];
tag.DTSCLoader(thisPacket, trk);
if (trk.codec == "PCM" && trk.size == 16){
char * ptr = tag.getData();
uint32_t ptrSize = tag.getDataLen();
for (uint32_t i = 0; i < ptrSize; i+=2){
char tmpchar = ptr[i];
ptr[i] = ptr[i+1];
ptr[i+1] = tmpchar;
}
}
if (tag.len){
H.Chunkify(tag.data, tag.len, myConn);
}

View file

@ -103,7 +103,17 @@ namespace Mist {
}
}
tag.DTSCLoader(thisPacket, myMeta.tracks[thisPacket.getTrackId()]);
DTSC::Track & trk = myMeta.tracks[thisPacket.getTrackId()];
tag.DTSCLoader(thisPacket, trk);
if (trk.codec == "PCM" && trk.size == 16){
char * ptr = tag.getData();
uint32_t ptrSize = tag.getDataLen();
for (uint32_t i = 0; i < ptrSize; i+=2){
char tmpchar = ptr[i];
ptr[i] = ptr[i+1];
ptr[i+1] = tmpchar;
}
}
myConn.SendNow(tag.data, tag.len);
if (config->getBool("keyframeonly")){
config->is_active = false;

View file

@ -218,7 +218,7 @@ namespace Mist {
}
void OutRTMP::init(Util::Config * cfg) {
void OutRTMP::init(Util::Config * cfg){
Output::init(cfg);
capa["name"] = "RTMP";
capa["desc"] = "Enables ingest and output over Adobe's RTMP protocol.";
@ -262,7 +262,7 @@ namespace Mist {
cfg->addOption("streamname", JSON::fromString("{\"arg\":\"string\",\"short\":\"s\",\"long\":\"stream\",\"help\":\"The name of the stream to push out, when pushing out.\"}"));
}
void OutRTMP::sendNext() {
void OutRTMP::sendNext(){
//If there are now more selectable tracks, select the new track and do a seek to the current timestamp
//Set sentHeader to false to force it to send init data
@ -285,14 +285,16 @@ namespace Mist {
}
char rtmpheader[] = {0, //byte 0 = cs_id | ch_type
char rtmpheader[] ={0, //byte 0 = cs_id | ch_type
0, 0, 0, //bytes 1-3 = timestamp
0, 0, 0, //bytes 4-6 = length
0x12, //byte 7 = msg_type_id
1, 0, 0, 0, //bytes 8-11 = msg_stream_id = 1
0, 0, 0, 0}; //bytes 12-15 = extended timestamp
char dataheader[] = {0, 0, 0, 0, 0};
char dataheader[] ={0, 0, 0, 0, 0};
unsigned int dheader_len = 1;
static char * swappyPointer = 0;
static uint32_t swappySize = 0;
char * tmpData = 0;//pointer to raw media data
unsigned int data_len = 0;//length of processed media data
thisPacket.getString("data", tmpData, data_len);
@ -340,13 +342,29 @@ namespace Mist {
dataheader[0] |= 0x20;
}
}
if (track.codec == "ADPCM") {
if (track.codec == "ADPCM"){
dataheader[0] |= 0x10;
}
if (track.codec == "PCM") {
if (track.codec == "PCM"){
if (track.size == 16){
if (swappySize < data_len){
char * tmp = (char*)realloc(swappyPointer, data_len);
if (!tmp){
FAIL_MSG("Could not allocate data for PCM endianness swap!");
return;
}
swappyPointer = tmp;
swappySize = data_len;
}
for (uint32_t i = 0; i < data_len; i+=2){
swappyPointer[i] = tmpData[i+1];
swappyPointer[i+1] = tmpData[i];
}
tmpData = swappyPointer;
}
dataheader[0] |= 0x30;
}
if (track.codec == "Nellymoser") {
if (track.codec == "Nellymoser"){
if (track.rate == 8000){
dataheader[0] |= 0x50;
}else if(track.rate == 16000){
@ -355,13 +373,13 @@ namespace Mist {
dataheader[0] |= 0x60;
}
}
if (track.codec == "ALAW") {
if (track.codec == "ALAW"){
dataheader[0] |= 0x70;
}
if (track.codec == "ULAW") {
if (track.codec == "ULAW"){
dataheader[0] |= 0x80;
}
if (track.codec == "Speex") {
if (track.codec == "Speex"){
dataheader[0] |= 0xB0;
}
if (track.rate >= 44100){
@ -479,20 +497,20 @@ namespace Mist {
RTMPStream::snd_cnt += header_len + data_len + steps;
}
void OutRTMP::sendHeader() {
void OutRTMP::sendHeader(){
FLV::Tag tag;
tag.DTSCMetaInit(myMeta, selectedTracks);
if (tag.len) {
if (tag.len){
myConn.SendNow(RTMPStream::SendMedia(tag));
}
for (std::set<long unsigned int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++) {
if (myMeta.tracks[*it].type == "video") {
for (std::set<long unsigned int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
if (myMeta.tracks[*it].type == "video"){
if (tag.DTSCVideoInit(myMeta.tracks[*it])){
myConn.SendNow(RTMPStream::SendMedia(tag));
}
}
if (myMeta.tracks[*it].type == "audio") {
if (myMeta.tracks[*it].type == "audio"){
if (tag.DTSCAudioInit(myMeta.tracks[*it])){
myConn.SendNow(RTMPStream::SendMedia(tag));
}
@ -522,14 +540,14 @@ namespace Mist {
///\param amfReply The data to be sent over RTMP.
///\param messageType The type of message.
///\param streamId The ID of the AMF stream.
void OutRTMP::sendCommand(AMF::Object & amfReply, int messageType, int streamId) {
void OutRTMP::sendCommand(AMF::Object & amfReply, int messageType, int streamId){
HIGH_MSG("Sending: %s", amfReply.Print().c_str());
if (messageType == 17) {
if (messageType == 17){
myConn.SendNow(RTMPStream::SendChunk(3, messageType, streamId, (char)0 + amfReply.Pack()));
} else {
}else{
myConn.SendNow(RTMPStream::SendChunk(3, messageType, streamId, amfReply.Pack()));
}
} //sendCommand
}//sendCommand
///\brief Parses a single AMF command message, and sends a direct response through sendCommand().
///\param amfData The received request.
@ -553,7 +571,7 @@ namespace Mist {
void OutRTMP::parseAMFCommand(AMF::Object & amfData, int messageType, int streamId) {
MEDIUM_MSG("Received command: %s", amfData.Print().c_str());
HIGH_MSG("AMF0 command: %s", amfData.getContentP(0)->StrValue().c_str());
if (amfData.getContentP(0)->StrValue() == "xsbwtest") {
if (amfData.getContentP(0)->StrValue() == "xsbwtest"){
//send a _result reply
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
amfReply.addContent(AMF::Object("", "_error")); //result success
@ -563,9 +581,9 @@ namespace Mist {
sendCommand(amfReply, messageType, streamId);
return;
}
if (amfData.getContentP(0)->StrValue() == "connect") {
if (amfData.getContentP(0)->StrValue() == "connect"){
double objencoding = 0;
if (amfData.getContentP(2)->getContentP("objectEncoding")) {
if (amfData.getContentP(2)->getContentP("objectEncoding")){
objencoding = amfData.getContentP(2)->getContentP("objectEncoding")->NumValue();
}
if (amfData.getContentP(2)->getContentP("flashVer")) {
@ -603,8 +621,8 @@ namespace Mist {
//amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null
//sendCommand(amfReply, messageType, streamId);
return;
} //connect
if (amfData.getContentP(0)->StrValue() == "createStream") {
}//connect
if (amfData.getContentP(0)->StrValue() == "createStream"){
//send a _result reply
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
amfReply.addContent(AMF::Object("", "_result")); //result success
@ -614,8 +632,8 @@ namespace Mist {
sendCommand(amfReply, messageType, streamId);
myConn.SendNow(RTMPStream::SendUSR(0, 1)); //send UCM StreamBegin (0), stream 1
return;
} //createStream
if (amfData.getContentP(0)->StrValue() == "ping") {
}//createStream
if (amfData.getContentP(0)->StrValue() == "ping"){
//send a _result reply
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
amfReply.addContent(AMF::Object("", "_result")); //result success
@ -624,7 +642,7 @@ namespace Mist {
amfReply.addContent(AMF::Object("", "Pong!")); //stream ID - we use 1
sendCommand(amfReply, messageType, streamId);
return;
} //createStream
}//createStream
if (amfData.getContentP(0)->StrValue() == "closeStream"){
myConn.SendNow(RTMPStream::SendUSR(1, 1)); //send UCM StreamEOF (1), stream 1
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
@ -641,16 +659,16 @@ namespace Mist {
stop();
return;
}
if (amfData.getContentP(0)->StrValue() == "deleteStream") {
if (amfData.getContentP(0)->StrValue() == "deleteStream"){
stop();
onFinish();
return;
}
if ((amfData.getContentP(0)->StrValue() == "FCUnpublish") || (amfData.getContentP(0)->StrValue() == "releaseStream")) {
if ((amfData.getContentP(0)->StrValue() == "FCUnpublish") || (amfData.getContentP(0)->StrValue() == "releaseStream")){
// ignored
return;
}
if ((amfData.getContentP(0)->StrValue() == "FCSubscribe")) {
if ((amfData.getContentP(0)->StrValue() == "FCSubscribe")){
//send a FCPublish reply
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
amfReply.addContent(AMF::Object("", "onFCSubscribe")); //status reply
@ -662,8 +680,8 @@ namespace Mist {
amfReply.getContentP(3)->addContent(AMF::Object("description", "Please follow up with play or publish command, as we ignore this command."));
sendCommand(amfReply, messageType, streamId);
return;
} //FCPublish
if ((amfData.getContentP(0)->StrValue() == "FCPublish")) {
}//FCPublish
if ((amfData.getContentP(0)->StrValue() == "FCPublish")){
//send a FCPublish reply
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
amfReply.addContent(AMF::Object("", "onFCPublish")); //status reply
@ -674,8 +692,8 @@ namespace Mist {
amfReply.getContentP(3)->addContent(AMF::Object("description", "Please follow up with publish command, as we ignore this command."));
sendCommand(amfReply, messageType, streamId);
return;
} //FCPublish
if (amfData.getContentP(0)->StrValue() == "releaseStream") {
}//FCPublish
if (amfData.getContentP(0)->StrValue() == "releaseStream"){
//send a _result reply
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
amfReply.addContent(AMF::Object("", "_result")); //result success
@ -685,7 +703,7 @@ namespace Mist {
sendCommand(amfReply, messageType, streamId);
return;
}//releaseStream
if ((amfData.getContentP(0)->StrValue() == "getStreamLength") || (amfData.getContentP(0)->StrValue() == "getMovLen")) {
if ((amfData.getContentP(0)->StrValue() == "getStreamLength") || (amfData.getContentP(0)->StrValue() == "getMovLen")){
//send a _result reply
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
amfReply.addContent(AMF::Object("", "_result")); //result success
@ -694,9 +712,9 @@ namespace Mist {
amfReply.addContent(AMF::Object("", (double)0)); //zero length
sendCommand(amfReply, messageType, streamId);
return;
} //getStreamLength
if ((amfData.getContentP(0)->StrValue() == "publish")) {
if (amfData.getContentP(3)) {
}//getStreamLength
if ((amfData.getContentP(0)->StrValue() == "publish")){
if (amfData.getContentP(3)){
streamName = Encodings::URL::decode(amfData.getContentP(3)->StrValue());
reqUrl += "/"+streamName;//LTS
@ -761,8 +779,8 @@ namespace Mist {
amfReply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
sendCommand(amfReply, messageType, streamId);
return;
} //getStreamLength
if (amfData.getContentP(0)->StrValue() == "checkBandwidth") {
}//getStreamLength
if (amfData.getContentP(0)->StrValue() == "checkBandwidth"){
//send a _result reply
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
amfReply.addContent(AMF::Object("", "_result")); //result success
@ -771,11 +789,11 @@ namespace Mist {
amfReply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL)); //null - command info
sendCommand(amfReply, messageType, streamId);
return;
} //checkBandwidth
if (amfData.getContentP(0)->StrValue() == "onBWDone") {
}//checkBandwidth
if (amfData.getContentP(0)->StrValue() == "onBWDone"){
return;
}
if ((amfData.getContentP(0)->StrValue() == "play") || (amfData.getContentP(0)->StrValue() == "play2")) {
if ((amfData.getContentP(0)->StrValue() == "play") || (amfData.getContentP(0)->StrValue() == "play2")){
//set reply number and stream name, actual reply is sent up in the ss.spool() handler
int playTransaction = amfData.getContentP(1)->NumValue();
int playMessageType = messageType;
@ -815,7 +833,7 @@ namespace Mist {
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
sendCommand(amfreply, playMessageType, playStreamId);
//send streamisrecorded if stream, well, is recorded.
if (myMeta.vod) { //isMember("length") && Strm.metadata["length"].asInt() > 0){
if (myMeta.vod){//isMember("length") && Strm.metadata["length"].asInt() > 0){
myConn.SendNow(RTMPStream::SendUSR(4, 1)); //send UCM StreamIsRecorded (4), stream 1
}
//send streambegin
@ -842,8 +860,8 @@ namespace Mist {
parseData = true;
return;
} //play
if ((amfData.getContentP(0)->StrValue() == "seek")) {
}//play
if ((amfData.getContentP(0)->StrValue() == "seek")){
//set reply number and stream name, actual reply is sent up in the ss.spool() handler
int playTransaction = amfData.getContentP(1)->NumValue();
int playMessageType = messageType;
@ -875,7 +893,7 @@ namespace Mist {
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
sendCommand(amfreply, playMessageType, playStreamId);
//send streamisrecorded if stream, well, is recorded.
if (myMeta.vod) { //isMember("length") && Strm.metadata["length"].asInt() > 0){
if (myMeta.vod){//isMember("length") && Strm.metadata["length"].asInt() > 0){
myConn.SendNow(RTMPStream::SendUSR(4, 1)); //send UCM StreamIsRecorded (4), stream 1
}
//send streambegin
@ -902,11 +920,11 @@ namespace Mist {
myConn.SendNow(RTMPStream::SendUSR(32, 1)); //send UCM no clue?, stream 1
return;
} //seek
if ((amfData.getContentP(0)->StrValue() == "pauseRaw") || (amfData.getContentP(0)->StrValue() == "pause")) {
}//seek
if ((amfData.getContentP(0)->StrValue() == "pauseRaw") || (amfData.getContentP(0)->StrValue() == "pause")){
int playMessageType = messageType;
int playStreamId = streamId;
if (amfData.getContentP(3)->NumValue()) {
if (amfData.getContentP(3)->NumValue()){
parseData = false;
//send a status reply
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
@ -920,7 +938,7 @@ namespace Mist {
amfReply.getContentP(3)->addContent(AMF::Object("details", "DDV"));
amfReply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
sendCommand(amfReply, playMessageType, playStreamId);
} else {
}else{
parseData = true;
//send a status reply
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
@ -936,8 +954,8 @@ namespace Mist {
sendCommand(amfReply, playMessageType, playStreamId);
}
return;
} //seek
if (amfData.getContentP(0)->StrValue() == "_error") {
}//seek
if (amfData.getContentP(0)->StrValue() == "_error"){
WARN_MSG("Received error response: %s", amfData.Print().c_str());
return;
}
@ -992,11 +1010,11 @@ namespace Mist {
amfReply.addContent(AMF::Object("", amfData.getContentP(0)->StrValue())); //null - command info
amfReply.addContent(AMF::Object("", "Command not implemented or recognized")); //stream ID?
sendCommand(amfReply, messageType, streamId);
} //parseAMFCommand
}//parseAMFCommand
///\brief Gets and parses one RTMP chunk at a time.
///\param inputBuffer A buffer filled with chunk data.
void OutRTMP::parseChunk(Socket::Buffer & inputBuffer) {
void OutRTMP::parseChunk(Socket::Buffer & inputBuffer){
//for DTSC conversion
static std::stringstream prebuffer; // Temporary buffer before sending real data
//for chunk parsing
@ -1007,18 +1025,18 @@ namespace Mist {
static AMF::Object3 amf3data("empty", AMF::AMF3_DDV_CONTAINER);
static AMF::Object3 amf3elem("empty", AMF::AMF3_DDV_CONTAINER);
while (next.Parse(inputBuffer)) {
while (next.Parse(inputBuffer)){
//send ACK if we received a whole window
if ((RTMPStream::rec_cnt - RTMPStream::rec_window_at > RTMPStream::rec_window_size)) {
if ((RTMPStream::rec_cnt - RTMPStream::rec_window_at > RTMPStream::rec_window_size)){
RTMPStream::rec_window_at = RTMPStream::rec_cnt;
myConn.SendNow(RTMPStream::SendCTL(3, RTMPStream::rec_cnt)); //send ack (msg 3)
}
switch (next.msg_type_id) {
switch (next.msg_type_id){
case 0: //does not exist
WARN_MSG("UNKN: Received a zero-type message. Possible data corruption? Aborting!");
while (inputBuffer.size()) {
while (inputBuffer.size()){
inputBuffer.get().clear();
}
stop();
@ -1037,7 +1055,7 @@ namespace Mist {
RTMPStream::snd_window_at = ntohl(*(int *)next.data.c_str());
RTMPStream::snd_window_at = RTMPStream::snd_cnt;
break;
case 4: {
case 4:{
//2 bytes event type, rest = event data
//types:
//0 = stream begin, 4 bytes ID
@ -1049,7 +1067,7 @@ namespace Mist {
//7 = pingresponse, 4 bytes data
//we don't need to process this
short int ucmtype = ntohs(*(short int *)next.data.c_str());
switch (ucmtype) {
switch (ucmtype){
case 0:
MEDIUM_MSG("CTRL: UCM StreamBegin %i", ntohl(*((int *)(next.data.c_str() + 2))));
break;
@ -1091,10 +1109,10 @@ namespace Mist {
break;
case 8: //audio data
case 9: //video data
case 18: {//meta data
case 18:{//meta data
static std::map<unsigned int, AMF::Object> pushMeta;
static std::map<uint64_t, uint64_t> lastTagTime;
if (!isInitialized) {
if (!isInitialized){
MEDIUM_MSG("Received useless media data");
onFinish();
break;
@ -1127,6 +1145,15 @@ namespace Mist {
onFinish();
break;
}
if (myMeta.tracks[reTrack].codec == "PCM" && myMeta.tracks[reTrack].size == 16){
char * ptr = F.getData();
uint32_t ptrSize = F.getDataLen();
for (uint32_t i = 0; i < ptrSize; i+=2){
char tmpchar = ptr[i];
ptr[i] = ptr[i+1];
ptr[i+1] = tmpchar;
}
}
thisPacket.genericFill(tagTime, F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe);
ltt = tagTime;
if (!nProxy.userClient.getData()){
@ -1144,24 +1171,24 @@ namespace Mist {
case 16:
MEDIUM_MSG("Received AMF3 shared object");
break;
case 17: {
case 17:{
MEDIUM_MSG("Received AMF3 command message");
if (next.data[0] != 0) {
if (next.data[0] != 0){
next.data = next.data.substr(1);
amf3data = AMF::parse3(next.data);
MEDIUM_MSG("AMF3: %s", amf3data.Print().c_str());
} else {
}else{
MEDIUM_MSG("Received AMF3-0 command message");
next.data = next.data.substr(1);
amfdata = AMF::parse(next.data);
parseAMFCommand(amfdata, 17, next.msg_stream_id);
} //parsing AMF0-style
}//parsing AMF0-style
}
break;
case 19:
MEDIUM_MSG("Received AMF0 shared object");
break;
case 20: { //AMF0 command message
case 20:{//AMF0 command message
amfdata = AMF::parse(next.data);
parseAMFCommand(amfdata, 20, next.msg_stream_id);
}