Merge branch 'master' of projectlivestream.com:pls

This commit is contained in:
Erik Zandvliet 2011-01-17 16:22:17 +01:00
commit 8447b02922
3 changed files with 40 additions and 15 deletions

View file

@ -38,12 +38,14 @@ class user{
int MyBuffer_len; int MyBuffer_len;
int MyNum; int MyNum;
int currsend; int currsend;
bool gotproperaudio;
void * lastpointer; void * lastpointer;
static int UserCount; static int UserCount;
int s; int s;
user(int fd){ user(int fd){
s = fd; s = fd;
MyNum = UserCount++; MyNum = UserCount++;
gotproperaudio = false;
std::cout << "User " << MyNum << " connected" << std::endl; std::cout << "User " << MyNum << " connected" << std::endl;
}//constructor }//constructor
void Disconnect(std::string reason) { void Disconnect(std::string reason) {
@ -64,6 +66,7 @@ class user{
return (currsend == todo); return (currsend == todo);
} }
void Send(buffer ** ringbuf, int buffers){ void Send(buffer ** ringbuf, int buffers){
//TODO: Bij MP3: gotproperaudio - if false, stuur alleen als eerste byte is 0xFF en set op true
//not connected? cancel //not connected? cancel
if (s < 0){return;} if (s < 0){return;}
//still waiting for next buffer? check it //still waiting for next buffer? check it

View file

@ -3,7 +3,8 @@
//debugging level 2 = errors //debugging level 2 = errors
//debugging level 3 = status information //debugging level 3 = status information
//debugging level 4 = extremely verbose status information //debugging level 4 = extremely verbose status information
#define DEBUG 3 //debugging level 5 = save all streams to FLV files
#define DEBUG 4
#include <iostream> #include <iostream>
#include <cstdlib> #include <cstdlib>
@ -38,7 +39,6 @@ int mainHandler(int connection){
FLV_Pack * tag = 0; FLV_Pack * tag = 0;
//first timestamp set //first timestamp set
int lastcheck = getNowMS();
firsttime = getNowMS(); firsttime = getNowMS();
if (doHandshake()){ if (doHandshake()){
@ -60,10 +60,11 @@ int mainHandler(int connection){
ev.data.fd = CONN_fd; ev.data.fd = CONN_fd;
epoll_ctl(poller, EPOLL_CTL_ADD, CONN_fd, &ev); epoll_ctl(poller, EPOLL_CTL_ADD, CONN_fd, &ev);
struct epoll_event events[1]; struct epoll_event events[1];
#if DEBUG >= 5
//for writing whole stream to a file
FILE * tmpfile = 0; FILE * tmpfile = 0;
char tmpstr[200]; char tmpstr[200];
#endif
while (!socketError && !All_Hell_Broke_Loose){ while (!socketError && !All_Hell_Broke_Loose){
//only parse input if available or not yet init'ed //only parse input if available or not yet init'ed
@ -80,7 +81,6 @@ int mainHandler(int connection){
case -1: break;//not ready yet case -1: break;//not ready yet
default: default:
parseChunk(); parseChunk();
lastcheck = getNowMS();
break; break;
} }
} }
@ -135,16 +135,15 @@ int mainHandler(int connection){
tag->data[6] = ftst % 256; tag->data[6] = ftst % 256;
} }
SendMedia((unsigned char)tag->data[0], (unsigned char *)tag->data+11, tag->len-15, ts); SendMedia((unsigned char)tag->data[0], (unsigned char *)tag->data+11, tag->len-15, ts);
#if DEBUG >= 5
//write whole stream to a file
if (tmpfile == 0){ if (tmpfile == 0){
sprintf(tmpstr, "./tmpfile_socket_%i.flv", CONN_fd); sprintf(tmpstr, "./tmpfile_socket_%i.flv", CONN_fd);
tmpfile = fopen(tmpstr, "w"); tmpfile = fopen(tmpstr, "w");
fwrite(FLVHeader, 13, 1, tmpfile); fwrite(FLVHeader, 13, 1, tmpfile);
} }
fwrite(tag->data, tag->len, 1, tmpfile); fwrite(tag->data, tag->len, 1, tmpfile);
#endif
lastcheck = getNowMS();
#if DEBUG >= 4 #if DEBUG >= 4
fprintf(stderr, "Sent a tag to %i\n", CONN_fd); fprintf(stderr, "Sent a tag to %i\n", CONN_fd);
#endif #endif
@ -153,14 +152,15 @@ int mainHandler(int connection){
} }
} }
//send ACK if we received a whole window //send ACK if we received a whole window
if ((rec_cnt - rec_window_at > rec_window_size) || (getNowMS() - lastcheck > 1)){ if ((rec_cnt - rec_window_at > rec_window_size)){
rec_window_at = rec_cnt; rec_window_at = rec_cnt;
SendCTL(3, rec_cnt);//send ack (msg 3) SendCTL(3, rec_cnt);//send ack (msg 3)
lastcheck = getNowMS();
} }
} }
close(CONN_fd); close(CONN_fd);
#if DEBUG >= 5
fclose(tmpfile); fclose(tmpfile);
#endif
if (inited) close(ss); if (inited) close(ss);
#if DEBUG >= 1 #if DEBUG >= 1
if (All_Hell_Broke_Loose){fprintf(stderr, "All Hell Broke Loose\n");} if (All_Hell_Broke_Loose){fprintf(stderr, "All Hell Broke Loose\n");}

View file

@ -45,6 +45,7 @@ void parseChunk(){
//6 = pingrequest, 4 bytes data //6 = pingrequest, 4 bytes data
//7 = pingresponse, 4 bytes data //7 = pingresponse, 4 bytes data
//we don't need to process this //we don't need to process this
SendCTL(3, rec_cnt);//send ack (msg 3)
} break; } break;
case 5://window size of other end case 5://window size of other end
#if DEBUG >= 4 #if DEBUG >= 4
@ -100,6 +101,9 @@ void parseChunk(){
case 20:{//AMF0 command message case 20:{//AMF0 command message
bool parsed = false; bool parsed = false;
amfdata = parseAMF(next.data, next.real_len); amfdata = parseAMF(next.data, next.real_len);
#if DEBUG >= 4
amfdata.Print();
#endif
if (amfdata.getContentP(0)->StrValue() == "connect"){ if (amfdata.getContentP(0)->StrValue() == "connect"){
#if DEBUG >= 4 #if DEBUG >= 4
int tmpint; int tmpint;
@ -112,7 +116,7 @@ void parseChunk(){
#endif #endif
SendCTL(6, rec_window_size, 0);//send peer bandwidth (msg 6) SendCTL(6, rec_window_size, 0);//send peer bandwidth (msg 6)
SendCTL(5, snd_window_size);//send window acknowledgement size (msg 5) SendCTL(5, snd_window_size);//send window acknowledgement size (msg 5)
SendUSR(0, 0);//send UCM StreamBegin (0), stream 0 SendUSR(0, 1);//send UCM StreamBegin (0), stream 1
//send a _result reply //send a _result reply
AMFType amfreply("container", (unsigned char)0xFF); AMFType amfreply("container", (unsigned char)0xFF);
amfreply.addContent(AMFType("", "_result"));//result success amfreply.addContent(AMFType("", "_result"));//result success
@ -127,6 +131,9 @@ void parseChunk(){
amfreply.getContentP(3)->addContent(AMFType("description", "Connection succeeded.")); amfreply.getContentP(3)->addContent(AMFType("description", "Connection succeeded."));
amfreply.getContentP(3)->addContent(AMFType("capabilities", (double)33));//from red5 server amfreply.getContentP(3)->addContent(AMFType("capabilities", (double)33));//from red5 server
amfreply.getContentP(3)->addContent(AMFType("fmsVer", "PLS/1,0,0,0"));//from red5 server amfreply.getContentP(3)->addContent(AMFType("fmsVer", "PLS/1,0,0,0"));//from red5 server
#if DEBUG >= 4
amfreply.Print();
#endif
SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()); SendChunk(3, 20, next.msg_stream_id, amfreply.Pack());
//send onBWDone packet //send onBWDone packet
//amfreply = AMFType("container", (unsigned char)0xFF); //amfreply = AMFType("container", (unsigned char)0xFF);
@ -143,8 +150,11 @@ void parseChunk(){
amfreply.addContent(amfdata.getContent(1));//same transaction ID amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMFType("", (double)0, 0x05));//null - command info amfreply.addContent(AMFType("", (double)0, 0x05));//null - command info
amfreply.addContent(AMFType("", (double)1));//stream ID - we use 1 amfreply.addContent(AMFType("", (double)1));//stream ID - we use 1
#if DEBUG >= 4
amfreply.Print();
#endif
SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()); SendChunk(3, 20, next.msg_stream_id, amfreply.Pack());
SendUSR(0, 0);//send UCM StreamBegin (0), stream 0 SendUSR(0, 1);//send UCM StreamBegin (0), stream 1
parsed = true; parsed = true;
}//createStream }//createStream
if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){ if ((amfdata.getContentP(0)->StrValue() == "getStreamLength") || (amfdata.getContentP(0)->StrValue() == "getMovLen")){
@ -154,6 +164,9 @@ void parseChunk(){
amfreply.addContent(amfdata.getContent(1));//same transaction ID amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMFType("", (double)0, 0x05));//null - command info amfreply.addContent(AMFType("", (double)0, 0x05));//null - command info
amfreply.addContent(AMFType("", (double)0));//zero length amfreply.addContent(AMFType("", (double)0));//zero length
#if DEBUG >= 4
amfreply.Print();
#endif
SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()); SendChunk(3, 20, next.msg_stream_id, amfreply.Pack());
parsed = true; parsed = true;
}//getStreamLength }//getStreamLength
@ -164,6 +177,9 @@ void parseChunk(){
amfreply.addContent(amfdata.getContent(1));//same transaction ID amfreply.addContent(amfdata.getContent(1));//same transaction ID
amfreply.addContent(AMFType("", (double)0, 0x05));//null - command info amfreply.addContent(AMFType("", (double)0, 0x05));//null - command info
amfreply.addContent(AMFType("", (double)0, 0x05));//null - command info amfreply.addContent(AMFType("", (double)0, 0x05));//null - command info
#if DEBUG >= 4
amfreply.Print();
#endif
SendChunk(3, 20, 1, amfreply.Pack()); SendChunk(3, 20, 1, amfreply.Pack());
parsed = true; parsed = true;
}//checkBandwidth }//checkBandwidth
@ -186,6 +202,9 @@ void parseChunk(){
amfreply.getContentP(3)->addContent(AMFType("description", "Playing and resetting...")); amfreply.getContentP(3)->addContent(AMFType("description", "Playing and resetting..."));
amfreply.getContentP(3)->addContent(AMFType("details", "PLS")); amfreply.getContentP(3)->addContent(AMFType("details", "PLS"));
amfreply.getContentP(3)->addContent(AMFType("clientid", (double)1)); amfreply.getContentP(3)->addContent(AMFType("clientid", (double)1));
#if DEBUG >= 4
amfreply.Print();
#endif
SendChunk(4, 20, next.msg_stream_id, amfreply.Pack()); SendChunk(4, 20, next.msg_stream_id, amfreply.Pack());
amfreply = AMFType("container", (unsigned char)0xFF); amfreply = AMFType("container", (unsigned char)0xFF);
amfreply.addContent(AMFType("", "onStatus"));//status reply amfreply.addContent(AMFType("", "onStatus"));//status reply
@ -197,6 +216,9 @@ void parseChunk(){
amfreply.getContentP(3)->addContent(AMFType("description", "Playing!")); amfreply.getContentP(3)->addContent(AMFType("description", "Playing!"));
amfreply.getContentP(3)->addContent(AMFType("details", "PLS")); amfreply.getContentP(3)->addContent(AMFType("details", "PLS"));
amfreply.getContentP(3)->addContent(AMFType("clientid", (double)1)); amfreply.getContentP(3)->addContent(AMFType("clientid", (double)1));
#if DEBUG >= 4
amfreply.Print();
#endif
SendChunk(4, 20, 1, amfreply.Pack()); SendChunk(4, 20, 1, amfreply.Pack());
//No clue what this does. Most real servers send it, though... //No clue what this does. Most real servers send it, though...
// amfreply = AMFType("container", (unsigned char)0xFF); // amfreply = AMFType("container", (unsigned char)0xFF);