Merge fix
This commit is contained in:
commit
8c01abba25
7 changed files with 328 additions and 74 deletions
|
@ -13,8 +13,6 @@
|
|||
#include "../util/flv_tag.h" //FLV format parser
|
||||
#include "../util/socket.h" //Socket lib
|
||||
|
||||
#include <sys/epoll.h>
|
||||
|
||||
/// Holds all code unique to the Buffer.
|
||||
namespace Buffer{
|
||||
|
||||
|
@ -137,9 +135,16 @@ namespace Buffer{
|
|||
|
||||
//then check and parse the commandline
|
||||
if (argc < 3) {
|
||||
std::cout << "usage: " << argv[0] << " buffers_count streamname" << std::endl;
|
||||
std::cout << "usage: " << argv[0] << " buffers_count streamname [awaiting_IP]" << std::endl;
|
||||
return 1;
|
||||
}
|
||||
std::string waiting_ip = "";
|
||||
bool ip_waiting = false;
|
||||
Socket::Connection ip_input;
|
||||
if (argc >= 4){
|
||||
waiting_ip += argv[3];
|
||||
ip_waiting = true;
|
||||
}
|
||||
std::string shared_socket = "/tmp/shared_socket_";
|
||||
shared_socket += argv[2];
|
||||
|
||||
|
@ -156,26 +161,23 @@ namespace Buffer{
|
|||
int lastproper = 0;//last properly finished buffer number
|
||||
unsigned int loopcount = 0;
|
||||
Socket::Connection incoming;
|
||||
Socket::Connection std_input(fileno(stdin));
|
||||
|
||||
unsigned char packtype;
|
||||
bool gotVideoInfo = false;
|
||||
bool gotAudioInfo = false;
|
||||
bool gotData = false;
|
||||
|
||||
int infile = fileno(stdin);//get file number for stdin
|
||||
|
||||
//add stdin to an epoll
|
||||
int poller = epoll_create(1);
|
||||
struct epoll_event ev;
|
||||
ev.events = EPOLLIN;
|
||||
ev.data.fd = infile;
|
||||
epoll_ctl(poller, EPOLL_CTL_ADD, infile, &ev);
|
||||
struct epoll_event events[1];
|
||||
|
||||
|
||||
while(!feof(stdin) && !FLV::Parse_Error){
|
||||
while((!feof(stdin) || ip_waiting) && !FLV::Parse_Error){
|
||||
//invalidate the current buffer
|
||||
ringbuf[current_buffer]->number = -1;
|
||||
if ((epoll_wait(poller, events, 1, 10) > 0) && ringbuf[current_buffer]->FLV.FileLoader(stdin)){
|
||||
if (
|
||||
(!ip_waiting &&
|
||||
(std_input.canRead()) && ringbuf[current_buffer]->FLV.FileLoader(stdin)
|
||||
) || (ip_waiting && (ip_input.connected()) &&
|
||||
ringbuf[current_buffer]->FLV.SockLoader(ip_input)
|
||||
)
|
||||
){
|
||||
loopcount++;
|
||||
packtype = ringbuf[current_buffer]->FLV.data[0];
|
||||
//store metadata, if available
|
||||
|
@ -215,6 +217,7 @@ namespace Buffer{
|
|||
lastproper = current_buffer;
|
||||
}
|
||||
}
|
||||
if (loopcount > 5){gotData = true;}
|
||||
//keep track of buffers
|
||||
ringbuf[current_buffer]->number = loopcount;
|
||||
current_buffer++;
|
||||
|
@ -230,17 +233,25 @@ namespace Buffer{
|
|||
users.back().MyBuffer = lastproper;
|
||||
users.back().MyBuffer_num = -1;
|
||||
/// \todo Do this more nicely?
|
||||
if (!users.back().S.write(FLV::Header, 13)){
|
||||
users.back().Disconnect("failed to receive the header!");
|
||||
}else{
|
||||
if (!users.back().S.write(metadata.data, metadata.len)){
|
||||
users.back().Disconnect("failed to receive metadata!");
|
||||
}
|
||||
if (!users.back().S.write(audio_init.data, audio_init.len)){
|
||||
users.back().Disconnect("failed to receive audio init!");
|
||||
}
|
||||
if (!users.back().S.write(video_init.data, video_init.len)){
|
||||
users.back().Disconnect("failed to receive video init!");
|
||||
if (gotData){
|
||||
if (!users.back().S.write(FLV::Header, 13)){
|
||||
users.back().Disconnect("failed to receive the header!");
|
||||
}else{
|
||||
if (metadata.len > 0){
|
||||
if (!users.back().S.write(metadata.data, metadata.len)){
|
||||
users.back().Disconnect("failed to receive metadata!");
|
||||
}
|
||||
}
|
||||
if (audio_init.len > 0){
|
||||
if (!users.back().S.write(audio_init.data, audio_init.len)){
|
||||
users.back().Disconnect("failed to receive audio init!");
|
||||
}
|
||||
}
|
||||
if (video_init.len > 0){
|
||||
if (!users.back().S.write(video_init.data, video_init.len)){
|
||||
users.back().Disconnect("failed to receive video init!");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -251,6 +262,29 @@ namespace Buffer{
|
|||
if (!(*usersIt).S.connected()){
|
||||
users.erase(usersIt); break;
|
||||
}else{
|
||||
if (!gotData && ip_waiting){
|
||||
if ((*usersIt).S.canRead()){
|
||||
std::string tmp = "";
|
||||
char charbuf;
|
||||
while (((*usersIt).S.iread(&charbuf, 1) == 1) && charbuf != '\n' ){
|
||||
tmp += charbuf;
|
||||
}
|
||||
if (tmp != ""){
|
||||
std::cout << "Push attempt from IP " << tmp << std::endl;
|
||||
if (tmp == waiting_ip){
|
||||
if (!ip_input.connected()){
|
||||
std::cout << "Push accepted!" << std::endl;
|
||||
ip_input = (*usersIt).S;
|
||||
users.erase(usersIt); break;
|
||||
}else{
|
||||
std::cout << "Push denied - push already in progress!" << std::endl;
|
||||
}
|
||||
}else{
|
||||
std::cout << "Push denied!" << std::endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
(*usersIt).Send(ringbuf, buffers);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
#include <unistd.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/wait.h>
|
||||
#include <sys/epoll.h>
|
||||
#include <getopt.h>
|
||||
#include <ctime>
|
||||
#include "../util/socket.h"
|
||||
|
@ -134,15 +133,6 @@ namespace Connector_HTTP{
|
|||
bool FlashFirstAudio = false;
|
||||
HTTP::Parser HTTP_R, HTTP_S;//HTTP Receiver en HTTP Sender.
|
||||
|
||||
int retval;
|
||||
int poller = epoll_create(1);
|
||||
int sspoller = epoll_create(1);
|
||||
struct epoll_event ev;
|
||||
ev.events = EPOLLIN;
|
||||
ev.data.fd = conn.getSocket();
|
||||
epoll_ctl(poller, EPOLL_CTL_ADD, conn.getSocket(), &ev);
|
||||
struct epoll_event events[1];
|
||||
|
||||
std::string Movie = "";
|
||||
std::string Quality = "";
|
||||
int Segment = -1;
|
||||
|
@ -221,9 +211,6 @@ namespace Connector_HTTP{
|
|||
conn.close();
|
||||
break;
|
||||
}
|
||||
ev.events = EPOLLIN;
|
||||
ev.data.fd = ss.getSocket();
|
||||
epoll_ctl(sspoller, EPOLL_CTL_ADD, ss.getSocket(), &ev);
|
||||
#if DEBUG >= 3
|
||||
fprintf(stderr, "Everything connected, starting to send video data...\n");
|
||||
#endif
|
||||
|
@ -240,7 +227,7 @@ namespace Connector_HTTP{
|
|||
fprintf(stderr, "Sending a video fragment. %i left in buffer, %i requested\n", (int)Flash_FragBuffer.size(), Flash_RequestPending);
|
||||
#endif
|
||||
}
|
||||
retval = epoll_wait(sspoller, events, 1, 1);
|
||||
ss.canRead();
|
||||
switch (ss.ready()){
|
||||
case -1:
|
||||
conn.close();
|
||||
|
@ -250,7 +237,7 @@ namespace Connector_HTTP{
|
|||
break;
|
||||
case 0: break;//not ready yet
|
||||
default:
|
||||
if (tag.SockLoader(ss)){//able to read a full packet?f
|
||||
if (tag.SockLoader(ss)){//able to read a full packet?
|
||||
if (handler == HANDLER_FLASH){
|
||||
if (tag.tagTime() > 0){
|
||||
if (Flash_StartTime == 0){
|
||||
|
@ -282,14 +269,18 @@ namespace Connector_HTTP{
|
|||
FlashFirstVideo = true;
|
||||
FlashFirstAudio = true;
|
||||
}
|
||||
if (FlashFirstVideo && (tag.data[0] == 0x09) && (Video_Init.len > 0)){
|
||||
Video_Init.tagTime(tag.tagTime());
|
||||
FlashBuf.append(Video_Init.data, Video_Init.len);
|
||||
if (FlashFirstVideo && (tag.data[0] == 0x09) && (!tag.needsInitData() || (Video_Init.len > 0))){
|
||||
if (tag.needsInitData()){
|
||||
Video_Init.tagTime(tag.tagTime());
|
||||
FlashBuf.append(Video_Init.data, Video_Init.len);
|
||||
}
|
||||
FlashFirstVideo = false;
|
||||
}
|
||||
if (FlashFirstAudio && (tag.data[0] == 0x08) && (Audio_Init.len > 0)){
|
||||
Audio_Init.tagTime(tag.tagTime());
|
||||
FlashBuf.append(Audio_Init.data, Audio_Init.len);
|
||||
if (FlashFirstAudio && (tag.data[0] == 0x08) && (!tag.needsInitData() || (Audio_Init.len > 0))){
|
||||
if (tag.needsInitData()){
|
||||
Audio_Init.tagTime(tag.tagTime());
|
||||
FlashBuf.append(Audio_Init.data, Audio_Init.len);
|
||||
}
|
||||
FlashFirstAudio = false;
|
||||
}
|
||||
#if DEBUG >= 5
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
#include <signal.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/wait.h>
|
||||
#include <sys/epoll.h>
|
||||
#include <getopt.h>
|
||||
#include "../util/socket.h"
|
||||
#include "../util/flv_tag.h"
|
||||
|
@ -25,6 +24,7 @@ namespace Connector_RTMP{
|
|||
bool stopparsing = false; ///< Set to true when all parsing needs to be cancelled.
|
||||
|
||||
Socket::Connection Socket; ///< Socket connected to user
|
||||
Socket::Connection SS; ///< Socket connected to server
|
||||
std::string streamname = "/tmp/shared_socket"; ///< Stream that will be opened
|
||||
void parseChunk();
|
||||
int Connector_RTMP(Socket::Connection conn);
|
||||
|
@ -34,7 +34,6 @@ namespace Connector_RTMP{
|
|||
/// Main Connector_RTMP function
|
||||
int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
|
||||
Socket = conn;
|
||||
Socket::Connection SS;
|
||||
FLV::Tag tag, viddata, auddata;
|
||||
bool viddone = false, auddone = false;
|
||||
|
||||
|
@ -59,20 +58,10 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
|
|||
return 0;
|
||||
}
|
||||
|
||||
int retval;
|
||||
int poller = epoll_create(1);
|
||||
int sspoller = epoll_create(1);
|
||||
struct epoll_event ev;
|
||||
ev.events = EPOLLIN;
|
||||
ev.data.fd = Socket.getSocket();
|
||||
epoll_ctl(poller, EPOLL_CTL_ADD, Socket.getSocket(), &ev);
|
||||
struct epoll_event events[1];
|
||||
|
||||
while (Socket.connected() && !FLV::Parse_Error){
|
||||
//only parse input if available or not yet init'ed
|
||||
//rightnow = getNowMS();
|
||||
retval = epoll_wait(poller, events, 1, 1);
|
||||
if ((retval > 0) || !ready4data){// || (snd_cnt - snd_window_at >= snd_window_size)
|
||||
if (Socket.canRead() || !ready4data){// || (snd_cnt - snd_window_at >= snd_window_size)
|
||||
switch (Socket.ready()){
|
||||
case -1: break; //disconnected
|
||||
case 0: break; //not ready yet
|
||||
|
@ -90,15 +79,12 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
|
|||
Socket.close();//disconnect user
|
||||
break;
|
||||
}
|
||||
ev.events = EPOLLIN;
|
||||
ev.data.fd = SS.getSocket();
|
||||
epoll_ctl(sspoller, EPOLL_CTL_ADD, SS.getSocket(), &ev);
|
||||
#if DEBUG >= 3
|
||||
fprintf(stderr, "Everything connected, starting to send video data...\n");
|
||||
#endif
|
||||
inited = true;
|
||||
}
|
||||
retval = epoll_wait(sspoller, events, 1, 1);
|
||||
SS.canRead();
|
||||
switch (SS.ready()){
|
||||
case -1:
|
||||
#if DEBUG >= 1
|
||||
|
@ -135,7 +121,10 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
|
|||
break;
|
||||
}
|
||||
//not gotten init yet? cancel this tag
|
||||
if (viddata.len == 0 || auddata.len == 0){break;}
|
||||
if (tag.needsInitData()){
|
||||
if ((tag.data[0] == 0x09) && (viddata.len == 0)){break;}
|
||||
if ((tag.data[0] == 0x08) && (auddata.len == 0)){break;}
|
||||
}
|
||||
//send tag normally
|
||||
Socket.write(RTMPStream::SendMedia(tag));
|
||||
#if DEBUG >= 8
|
||||
|
@ -168,6 +157,7 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){
|
|||
void Connector_RTMP::parseChunk(){
|
||||
static RTMPStream::Chunk next;
|
||||
static std::string inbuffer;
|
||||
FLV::Tag F;
|
||||
static AMF::Object amfdata("empty", AMF::AMF0_DDV_CONTAINER);
|
||||
static AMF::Object amfelem("empty", AMF::AMF0_DDV_CONTAINER);
|
||||
static AMF::Object3 amf3data("empty", AMF::AMF3_DDV_CONTAINER);
|
||||
|
@ -243,11 +233,19 @@ void Connector_RTMP::parseChunk(){
|
|||
#if DEBUG >= 4
|
||||
fprintf(stderr, "Received audio data\n");
|
||||
#endif
|
||||
F.ChunkLoader(next);
|
||||
if (SS.connected()){
|
||||
SS.write(std::string(F.data, F.len));
|
||||
}
|
||||
break;
|
||||
case 9:
|
||||
#if DEBUG >= 4
|
||||
fprintf(stderr, "Received video data\n");
|
||||
#endif
|
||||
F.ChunkLoader(next);
|
||||
if (SS.connected()){
|
||||
SS.write(std::string(F.data, F.len));
|
||||
}
|
||||
break;
|
||||
case 15:
|
||||
#if DEBUG >= 4
|
||||
|
@ -352,6 +350,62 @@ void Connector_RTMP::parseChunk(){
|
|||
Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack()));
|
||||
parsed3 = true;
|
||||
}//getStreamLength
|
||||
if ((amfdata.getContentP(0)->StrValue() == "publish")){
|
||||
if (amfdata.getContentP(3)){
|
||||
streamname = amfdata.getContentP(3)->StrValue();
|
||||
for (std::string::iterator i=streamname.begin(); i != streamname.end(); ++i){
|
||||
if (*i == '?'){streamname.erase(i, streamname.end()); break;}
|
||||
if (!isalpha(*i) && !isdigit(*i)){
|
||||
streamname.erase(i);
|
||||
--i;
|
||||
}else{
|
||||
*i=tolower(*i);
|
||||
}
|
||||
}
|
||||
streamname = "/tmp/shared_socket_" + streamname;
|
||||
#if DEBUG >= 4
|
||||
fprintf(stderr, "Connecting to buffer %s...\n", streamname.c_str());
|
||||
#endif
|
||||
SS = Socket::Connection(streamname);
|
||||
if (!SS.connected()){
|
||||
#if DEBUG >= 1
|
||||
fprintf(stderr, "Could not connect to server!\n");
|
||||
#endif
|
||||
Socket.close();//disconnect user
|
||||
break;
|
||||
}
|
||||
SS.write(Socket.getHost()+'\n');
|
||||
#if DEBUG >= 4
|
||||
fprintf(stderr, "Connected to buffer, starting to sent data...\n");
|
||||
#endif
|
||||
}
|
||||
//send a _result reply
|
||||
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
|
||||
amfreply.addContent(AMF::Object("", "_result"));//result success
|
||||
amfreply.addContent(amfdata.getContent(1));//same transaction ID
|
||||
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
||||
amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success?
|
||||
#if DEBUG >= 4
|
||||
amfreply.Print();
|
||||
#endif
|
||||
Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack()));
|
||||
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
|
||||
//send a status reply
|
||||
amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
|
||||
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
|
||||
amfreply.addContent(AMF::Object("", 0, AMF::AMF0_NUMBER));//same transaction ID
|
||||
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
||||
amfreply.addContent(AMF::Object(""));//info
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start"));
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!"));
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
||||
#if DEBUG >= 4
|
||||
amfreply.Print();
|
||||
#endif
|
||||
Socket.write(RTMPStream::SendChunk(3, 17, next.msg_stream_id, (char)0+amfreply.Pack()));
|
||||
parsed3 = true;
|
||||
}//getStreamLength
|
||||
if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){
|
||||
//send a _result reply
|
||||
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
|
||||
|
@ -421,6 +475,10 @@ void Connector_RTMP::parseChunk(){
|
|||
#if DEBUG >= 4
|
||||
fprintf(stderr, "Received AFM0 data message (metadata)\n");
|
||||
#endif
|
||||
F.ChunkLoader(next);
|
||||
if (SS.connected()){
|
||||
SS.write(std::string(F.data, F.len));
|
||||
}
|
||||
break;
|
||||
case 19:
|
||||
#if DEBUG >= 4
|
||||
|
@ -441,12 +499,16 @@ void Connector_RTMP::parseChunk(){
|
|||
fprintf(stderr, "Object encoding set to %e\n", objencoding);
|
||||
#if DEBUG >= 4
|
||||
int tmpint;
|
||||
tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue();
|
||||
if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");}
|
||||
if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");}
|
||||
tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue();
|
||||
if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");}
|
||||
if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");}
|
||||
if (amfdata.getContentP(2)->getContentP("videoCodecs")){
|
||||
tmpint = (int)amfdata.getContentP(2)->getContentP("videoCodecs")->NumValue();
|
||||
if (tmpint & 0x04){fprintf(stderr, "Sorensen video support detected\n");}
|
||||
if (tmpint & 0x80){fprintf(stderr, "H264 video support detected\n");}
|
||||
}
|
||||
if (amfdata.getContentP(2)->getContentP("audioCodecs")){
|
||||
tmpint = (int)amfdata.getContentP(2)->getContentP("audioCodecs")->NumValue();
|
||||
if (tmpint & 0x04){fprintf(stderr, "MP3 audio support detected\n");}
|
||||
if (tmpint & 0x400){fprintf(stderr, "AAC video support detected\n");}
|
||||
}
|
||||
#endif
|
||||
RTMPStream::chunk_snd_max = 4096;
|
||||
Socket.write(RTMPStream::SendCTL(1, RTMPStream::chunk_snd_max));//send chunk size max (msg 1)
|
||||
|
@ -508,6 +570,62 @@ void Connector_RTMP::parseChunk(){
|
|||
Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()));
|
||||
parsed = true;
|
||||
}//getStreamLength
|
||||
if ((amfdata.getContentP(0)->StrValue() == "publish")){
|
||||
if (amfdata.getContentP(3)){
|
||||
streamname = amfdata.getContentP(3)->StrValue();
|
||||
for (std::string::iterator i=streamname.begin(); i != streamname.end(); ++i){
|
||||
if (*i == '?'){streamname.erase(i, streamname.end()); break;}
|
||||
if (!isalpha(*i) && !isdigit(*i)){
|
||||
streamname.erase(i);
|
||||
--i;
|
||||
}else{
|
||||
*i=tolower(*i);
|
||||
}
|
||||
}
|
||||
streamname = "/tmp/shared_socket_" + streamname;
|
||||
#if DEBUG >= 4
|
||||
fprintf(stderr, "Connecting to buffer %s...\n", streamname.c_str());
|
||||
#endif
|
||||
SS = Socket::Connection(streamname);
|
||||
if (!SS.connected()){
|
||||
#if DEBUG >= 1
|
||||
fprintf(stderr, "Could not connect to server!\n");
|
||||
#endif
|
||||
Socket.close();//disconnect user
|
||||
break;
|
||||
}
|
||||
SS.write(Socket.getHost()+'\n');
|
||||
#if DEBUG >= 4
|
||||
fprintf(stderr, "Connected to buffer, starting to send data...\n");
|
||||
#endif
|
||||
}
|
||||
//send a _result reply
|
||||
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
|
||||
amfreply.addContent(AMF::Object("", "_result"));//result success
|
||||
amfreply.addContent(amfdata.getContent(1));//same transaction ID
|
||||
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
||||
amfreply.addContent(AMF::Object("", 1, AMF::AMF0_BOOL));//publish success?
|
||||
#if DEBUG >= 4
|
||||
amfreply.Print();
|
||||
#endif
|
||||
Socket.write(RTMPStream::SendChunk(3, 20, next.msg_stream_id, amfreply.Pack()));
|
||||
Socket.write(RTMPStream::SendUSR(0, 1));//send UCM StreamBegin (0), stream 1
|
||||
//send a status reply
|
||||
amfreply = AMF::Object("container", AMF::AMF0_DDV_CONTAINER);
|
||||
amfreply.addContent(AMF::Object("", "onStatus"));//status reply
|
||||
amfreply.addContent(AMF::Object("", 0, AMF::AMF0_NUMBER));//same transaction ID
|
||||
amfreply.addContent(AMF::Object("", (double)0, AMF::AMF0_NULL));//null - command info
|
||||
amfreply.addContent(AMF::Object(""));//info
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("level", "status"));
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("code", "NetStream.Publish.Start"));
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("description", "Stream is now published!"));
|
||||
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
|
||||
#if DEBUG >= 4
|
||||
amfreply.Print();
|
||||
#endif
|
||||
Socket.write(RTMPStream::SendChunk(4, 20, next.msg_stream_id, amfreply.Pack()));
|
||||
parsed = true;
|
||||
}//getStreamLength
|
||||
if (amfdata.getContentP(0)->StrValue() == "checkBandwidth"){
|
||||
//send a _result reply
|
||||
AMF::Object amfreply("container", AMF::AMF0_DDV_CONTAINER);
|
||||
|
|
|
@ -188,10 +188,25 @@ bool HTTP::Parser::parse(){
|
|||
}
|
||||
if (seenHeaders){
|
||||
if (length > 0){
|
||||
/// \todo Include POST variable parsing?
|
||||
if (HTTPbuffer.length() >= length){
|
||||
body = HTTPbuffer.substr(0, length);
|
||||
HTTPbuffer.erase(0, length);
|
||||
std::string tmppost = body;
|
||||
std::string varname;
|
||||
std::string varval;
|
||||
while (tmppost.find('=') != std::string::npos){
|
||||
size_t found = tmppost.find('=');
|
||||
varname = urlunescape((char*)tmppost.substr(0, found).c_str());
|
||||
tmppost.erase(0, found+1);
|
||||
found = tmppost.find('&');
|
||||
varval = urlunescape((char*)tmppost.substr(0, found).c_str());
|
||||
SetVar(varname, varval);
|
||||
if (found == std::string::npos){
|
||||
tmppost.clear();
|
||||
}else{
|
||||
tmppost.erase(0, found+1);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}else{
|
||||
return false;
|
||||
|
@ -241,3 +256,29 @@ void HTTP::Parser::SendBodyPart(Socket::Connection & conn, std::string bodypart)
|
|||
conn.write(bodypart);
|
||||
}
|
||||
}
|
||||
|
||||
/// Unescapes URLencoded C-strings to a std::string.
|
||||
/// This function *will* destroy the input data!
|
||||
std::string HTTP::Parser::urlunescape(char *s){
|
||||
char *p;
|
||||
for (p = s; *s != '\0'; ++s){
|
||||
if (*s == '%'){
|
||||
if (*++s != '\0'){
|
||||
*p = unhex(*s) << 4;
|
||||
}
|
||||
if (*++s != '\0'){
|
||||
*p++ += unhex(*s);
|
||||
}
|
||||
} else {
|
||||
if (*s == '+'){*p++ = ' ';}else{*p++ = *s;}
|
||||
}
|
||||
}
|
||||
*p = '\0';
|
||||
return std::string(s);
|
||||
}
|
||||
|
||||
/// Helper function for urlunescape.
|
||||
/// Takes a single char input and outputs its integer hex value.
|
||||
int HTTP::Parser::unhex(char c){
|
||||
return( c >= '0' && c <= '9' ? c - '0' : c >= 'A' && c <= 'F' ? c - 'A' + 10 : c - 'a' + 10 );
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ namespace HTTP{
|
|||
void SendBodyPart(Socket::Connection & conn, std::string bodypart);
|
||||
void Clean();
|
||||
bool CleanForNext();
|
||||
std::string urlunescape(char *s); ///< Unescapes URLencoded C-strings to a std::string.
|
||||
std::string body;
|
||||
std::string method;
|
||||
std::string url;
|
||||
|
@ -43,5 +44,6 @@ namespace HTTP{
|
|||
std::map<std::string, std::string> headers;
|
||||
std::map<std::string, std::string> vars;
|
||||
void Trim(std::string & s);
|
||||
int unhex(char c); ///< Helper function for urlunescape.
|
||||
};//HTTP::Parser class
|
||||
};//HTTP namespace
|
||||
|
|
|
@ -3,6 +3,11 @@
|
|||
/// Written by Jaron Vietor in 2010 for DDVTech
|
||||
|
||||
#include "socket.h"
|
||||
#include <poll.h>
|
||||
|
||||
#ifdef __FreeBSD__
|
||||
#include <netinet/in.h>
|
||||
#endif
|
||||
|
||||
/// Create a new base socket. This is a basic constructor for converting any valid socket to a Socket::Connection.
|
||||
/// \param sockNo Integer representing the socket to convert.
|
||||
|
@ -69,6 +74,27 @@ Socket::Connection::Connection(std::string address, bool nonblock){
|
|||
}
|
||||
}//Socket::Connection Unix Contructor
|
||||
|
||||
/// Calls poll() on the socket, checking if data is available.
|
||||
/// This function may return true even if there is no data, but never returns false when there is.
|
||||
bool Socket::Connection::canRead(){
|
||||
struct pollfd PFD;
|
||||
PFD.fd = sock;
|
||||
PFD.events = POLLIN;
|
||||
PFD.revents = 0;
|
||||
poll(&PFD, 1, 5);
|
||||
return (PFD.revents & POLLIN) == POLLIN;
|
||||
}
|
||||
/// Calls poll() on the socket, checking if data can be written.
|
||||
bool Socket::Connection::canWrite(){
|
||||
struct pollfd PFD;
|
||||
PFD.fd = sock;
|
||||
PFD.events = POLLOUT;
|
||||
PFD.revents = 0;
|
||||
poll(&PFD, 1, 5);
|
||||
return (PFD.revents & POLLOUT) == POLLOUT;
|
||||
}
|
||||
|
||||
|
||||
/// Returns the ready-state for this socket.
|
||||
/// \returns 1 if data is waiting to be read, -1 if not connected, 0 otherwise.
|
||||
signed int Socket::Connection::ready(){
|
||||
|
@ -348,7 +374,47 @@ Socket::Server::Server(int port, std::string hostname, bool nonblock){
|
|||
}
|
||||
}else{
|
||||
#if DEBUG >= 1
|
||||
fprintf(stderr, "Binding failed! Error: %s\n", strerror(errno));
|
||||
fprintf(stderr, "Binding failed, retrying as IPv4... (%s)\n", strerror(errno));
|
||||
#endif
|
||||
close();
|
||||
}
|
||||
sock = socket(AF_INET, SOCK_STREAM, 0);
|
||||
if (sock < 0){
|
||||
#if DEBUG >= 1
|
||||
fprintf(stderr, "Could not create socket! Error: %s\n", strerror(errno));
|
||||
#endif
|
||||
return;
|
||||
}
|
||||
on = 1;
|
||||
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
|
||||
if (nonblock){
|
||||
int flags = fcntl(sock, F_GETFL, 0);
|
||||
flags |= O_NONBLOCK;
|
||||
fcntl(sock, F_SETFL, flags);
|
||||
}
|
||||
struct sockaddr_in addr4;
|
||||
addr4.sin_family = AF_INET;
|
||||
addr4.sin_port = htons(port);//set port
|
||||
if (hostname == "0.0.0.0"){
|
||||
addr4.sin_addr.s_addr = INADDR_ANY;
|
||||
}else{
|
||||
inet_pton(AF_INET, hostname.c_str(), &addr4.sin_addr);//set interface, 0.0.0.0 (default) is all
|
||||
}
|
||||
ret = bind(sock, (sockaddr*)&addr4, sizeof(addr4));//do the actual bind
|
||||
if (ret == 0){
|
||||
ret = listen(sock, 100);//start listening, backlog of 100 allowed
|
||||
if (ret == 0){
|
||||
return;
|
||||
}else{
|
||||
#if DEBUG >= 1
|
||||
fprintf(stderr, "Listen failed! Error: %s\n", strerror(errno));
|
||||
#endif
|
||||
close();
|
||||
return;
|
||||
}
|
||||
}else{
|
||||
#if DEBUG >= 1
|
||||
fprintf(stderr, "IPv4 binding also failed, giving up. (%s)\n", strerror(errno));
|
||||
#endif
|
||||
close();
|
||||
return;
|
||||
|
|
|
@ -27,6 +27,8 @@ namespace Socket{
|
|||
Connection(); ///< Create a new disconnected base socket.
|
||||
Connection(int sockNo); ///< Create a new base socket.
|
||||
Connection(std::string adres, bool nonblock = false); ///< Create a new Unix Socket.
|
||||
bool canRead(); ///< Calls poll() on the socket, checking if data is available.
|
||||
bool canWrite(); ///< Calls poll() on the socket, checking if data can be written.
|
||||
bool Error; ///< Set to true if a socket error happened.
|
||||
bool Blocking; ///< Set to true if a socket is currently or wants to be blocking.
|
||||
signed int ready(); ///< Returns the ready-state for this socket.
|
||||
|
|
Loading…
Add table
Reference in a new issue