Renamed all binaries with DDV_ prefix, updated Buffer for new systems, updated RAW connector, partly updated RTMP connector - ready for recode. All connectors now have host binding options.

This commit is contained in:
Thulinma 2011-04-09 03:24:29 +02:00
parent cc78697ba2
commit 2b22834fd8
11 changed files with 295 additions and 263 deletions

9
.gitignore vendored
View file

@ -1,10 +1,11 @@
#ignore object files and nonsense like that #ignore object files and nonsense like that
*.[oa] *.[oa]
Admin/main Admin/main
Connector_HTTP/Connector_HTTP Connector_HTTP/DDV_Conn_HTTP
Buffer/Buffer Buffer/DDV_Buffer
Connector_RTMP/Connector_RTMP Connector_RTMP/DDV_Conn_RTMP
Connector_RTSP/Connector_RTSP Connector_RTSP/DDV_Conn_RTSP
Connector_RAW/DDV_Conn_RAW
*~ *~
bin/* bin/*
docs docs

View file

@ -1,6 +1,6 @@
SRC = main.cpp SRC = main.cpp ../util/ddv_socket.cpp ../util/flv_tag.cpp
OBJ = $(SRC:.cpp=.o) OBJ = $(SRC:.cpp=.o)
OUT = Buffer OUT = DDV_Buffer
INCLUDES = INCLUDES =
CCFLAGS = -Wall -Wextra -funsigned-char -g CCFLAGS = -Wall -Wextra -funsigned-char -g
CC = $(CROSS)g++ CC = $(CROSS)g++

View file

@ -7,11 +7,13 @@
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <signal.h> #include <signal.h>
#include "../util/flv.cpp" //FLV format parser #include "../util/flv_tag.h" //FLV format parser
#include "../util/ddv_socket.cpp" //DDV Socket lib #include "../util/ddv_socket.h" //DDV Socket lib
#include <sys/epoll.h> #include <sys/epoll.h>
namespace Buffer{
void termination_handler (int signum){ void termination_handler (int signum){
switch (signum){ switch (signum){
case SIGPIPE: return; break; case SIGPIPE: return; break;
@ -19,81 +21,90 @@ void termination_handler (int signum){
} }
} }
///holds FLV::Tag objects and their numbers
struct buffer{ struct buffer{
int number; int number;
bool iskeyframe; FLV::Tag FLV;
FLV_Pack * FLV;
buffer(){
number = -1;
iskeyframe = false;
FLV = 0;
}//constructor
};//buffer };//buffer
/// Holds connected users.
/// Keeps track of what buffer users are using and the connection status.
class user{ class user{
public: public:
int MyBuffer; int MyBuffer; ///< Index of currently used buffer.
int MyBuffer_num; int MyBuffer_num; ///< Number of currently used buffer.
int MyBuffer_len; int MyBuffer_len; ///< Length in bytes of currently used buffer.
int MyNum; int MyNum; ///< User ID of this user.
int currsend; int currsend; ///< Current amount of bytes sent.
bool gotproperaudio; bool gotproperaudio; ///< Whether the user received proper audio yet.
void * lastpointer; void * lastpointer; ///< Pointer to data part of current buffer.
static int UserCount; static int UserCount; ///< Global user counter.
int s; DDV::Socket S; ///< Connection to user
user(int fd){ /// Creates a new user from a newly connected socket.
s = fd; /// Also prints "User connected" text to stdout.
user(DDV::Socket fd){
S = fd;
MyNum = UserCount++; MyNum = UserCount++;
gotproperaudio = false; gotproperaudio = false;
std::cout << "User " << MyNum << " connected" << std::endl; std::cout << "User " << MyNum << " connected" << std::endl;
}//constructor }//constructor
/// Disconnects the current user. Doesn't do anything if already disconnected.
/// Prints "Disconnected user" to stdout if disconnect took place.
void Disconnect(std::string reason) { void Disconnect(std::string reason) {
if (s != -1) { if (S.connected()) {
close(s); S.close();
s = -1;
std::cout << "Disconnected user " << MyNum << ": " << reason << std::endl; std::cout << "Disconnected user " << MyNum << ": " << reason << std::endl;
} }
}//Disconnect }//Disconnect
bool doSend(char * buffer, int todo){ /// Tries to send the current buffer, returns true if success, false otherwise.
int r = send(s, buffer+currsend, todo-currsend, 0); /// Has a side effect of dropping the connection if send will never complete.
bool doSend(){
int r = S.iwrite((char*)lastpointer+currsend, MyBuffer_len-currsend);
if (r <= 0){ if (r <= 0){
if ((r < 0) && (errno == EWOULDBLOCK)){return false;} if ((r < 0) && (errno == EWOULDBLOCK)){return false;}
Disconnect("Connection closed"); Disconnect("Connection closed");
return false; return false;
} }
currsend += r; currsend += r;
return (currsend == todo); return (currsend == MyBuffer_len);
} }//doSend
/// Try to send data to this user. Disconnects if any problems occur.
/// \param ringbuf Array of buffers (FLV:Tag with ID attached)
/// \param buffers Count of elements in ringbuf
void Send(buffer ** ringbuf, int buffers){ void Send(buffer ** ringbuf, int buffers){
//TODO: Bij MP3: gotproperaudio - if false, stuur alleen als eerste byte is 0xFF en set op true //TODO: Bij MP3: gotproperaudio - if false, stuur alleen als eerste byte is 0xFF en set op true
//not connected? cancel if (!S.connected()){return;}//cancel if not connected
if (s < 0){return;}
//still waiting for next buffer? check it //still waiting for next buffer? check it
if (MyBuffer_num < 0){ if (MyBuffer_num < 0){
MyBuffer_num = ringbuf[MyBuffer]->number; MyBuffer_num = ringbuf[MyBuffer]->number;
//still waiting? don't crash - wait longer.
if (MyBuffer_num < 0){ if (MyBuffer_num < 0){
return; return; //still waiting? don't crash - wait longer.
}else{ }else{
MyBuffer_len = ringbuf[MyBuffer]->FLV->len; MyBuffer_len = ringbuf[MyBuffer]->FLV.len;
lastpointer = ringbuf[MyBuffer]->FLV->data; lastpointer = ringbuf[MyBuffer]->FLV.data;
} }
} }
if (lastpointer != ringbuf[MyBuffer]->FLV->data){
//do check for buffer resizes
if (lastpointer != ringbuf[MyBuffer]->FLV.data){
Disconnect("Buffer resize at wrong time... had to disconnect"); Disconnect("Buffer resize at wrong time... had to disconnect");
return; return;
} }
if (doSend(ringbuf[MyBuffer]->FLV->data, MyBuffer_len)){
//completed a send - switch to next buffer //try to complete a send
if (doSend()){
//switch to next buffer
if ((ringbuf[MyBuffer]->number != MyBuffer_num)){ if ((ringbuf[MyBuffer]->number != MyBuffer_num)){
//if corrupt data, warn and find keyframe
std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl; std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl;
int nocrashcount = 0; int nocrashcount = 0;
do{ do{
MyBuffer++; MyBuffer++;
nocrashcount++; nocrashcount++;
MyBuffer %= buffers; MyBuffer %= buffers;
}while(!ringbuf[MyBuffer]->FLV->isKeyframe && (nocrashcount < buffers)); }while(!ringbuf[MyBuffer]->FLV.isKeyframe && (nocrashcount < buffers));
//if keyframe not available, try again later
if (nocrashcount >= buffers){ if (nocrashcount >= buffers){
std::cout << "Warning: No keyframe found in buffers! Skipping search for now..." << std::endl; std::cout << "Warning: No keyframe found in buffers! Skipping search for now..." << std::endl;
return; return;
@ -110,25 +121,27 @@ class user{
}; };
int user::UserCount = 0; int user::UserCount = 0;
int main( int argc, char * argv[] ) { /// Starts a loop, waiting for connections to send video data to.
int Start(int argc, char ** argv) {
//first make sure no segpipe signals will kill us
struct sigaction new_action; struct sigaction new_action;
new_action.sa_handler = termination_handler; new_action.sa_handler = termination_handler;
sigemptyset (&new_action.sa_mask); sigemptyset (&new_action.sa_mask);
new_action.sa_flags = 0; new_action.sa_flags = 0;
sigaction (SIGPIPE, &new_action, NULL); sigaction (SIGPIPE, &new_action, NULL);
if (argc < 2) { //then check and parse the commandline
std::cout << "usage: " << argv[0] << " buffers_count [streamname]" << std::endl; if (argc < 3) {
std::cout << "usage: " << argv[0] << " buffers_count streamname" << std::endl;
return 1; return 1;
} }
std::string shared_socket = "/tmp/shared_socket"; std::string shared_socket = "/tmp/shared_socket_";
if (argc > 2){ shared_socket += argv[2];
shared_socket = argv[2];
shared_socket = "/tmp/shared_socket_" + shared_socket;
}
int metabuflen = 0; DDV::ServerSocket SS(shared_socket, true);
char * metabuffer = 0; FLV::Tag metadata;
FLV::Tag video_init;
FLV::Tag audio_init;
int buffers = atoi(argv[1]); int buffers = atoi(argv[1]);
buffer ** ringbuf = (buffer**) calloc (buffers,sizeof(buffer*)); buffer ** ringbuf = (buffer**) calloc (buffers,sizeof(buffer*));
std::vector<user> users; std::vector<user> users;
@ -137,14 +150,15 @@ int main( int argc, char * argv[] ) {
int current_buffer = 0; int current_buffer = 0;
int lastproper = 0;//last properly finished buffer number int lastproper = 0;//last properly finished buffer number
unsigned int loopcount = 0; unsigned int loopcount = 0;
int listener = DDV_UnixListen(shared_socket, true); DDV::Socket incoming;
int incoming = 0;
unsigned char packtype; unsigned char packtype;
bool gotVideoInfo = false; bool gotVideoInfo = false;
bool gotAudioInfo = false; bool gotAudioInfo = false;
int infile = fileno(stdin); int infile = fileno(stdin);//get file number for stdin
//add stdin to an epoll
int poller = epoll_create(1); int poller = epoll_create(1);
struct epoll_event ev; struct epoll_event ev;
ev.events = EPOLLIN; ev.events = EPOLLIN;
@ -153,54 +167,46 @@ int main( int argc, char * argv[] ) {
struct epoll_event events[1]; struct epoll_event events[1];
while(!feof(stdin) && !All_Hell_Broke_Loose){ while(!feof(stdin) && !FLV::Parse_Error){
//invalidate the current buffer //invalidate the current buffer
ringbuf[current_buffer]->number = -1; ringbuf[current_buffer]->number = -1;
if ((epoll_wait(poller, events, 1, 10) > 0) && FLV_GetPacket(ringbuf[current_buffer]->FLV)){ if ((epoll_wait(poller, events, 1, 10) > 0) && ringbuf[current_buffer]->FLV.FileLoader(stdin)){
loopcount++; loopcount++;
packtype = ringbuf[current_buffer]->FLV->data[0]; packtype = ringbuf[current_buffer]->FLV.data[0];
//store metadata, if available //store metadata, if available
if (packtype == 0x12){ if (packtype == 0x12){
metabuflen = ringbuf[current_buffer]->FLV->len; metadata = ringbuf[current_buffer]->FLV;
metabuffer = (char*)realloc(metabuffer, metabuflen);
memcpy(metabuffer, ringbuf[current_buffer]->FLV->data, metabuflen);
std::cout << "Received metadata!" << std::endl; std::cout << "Received metadata!" << std::endl;
if (gotVideoInfo && gotAudioInfo){ if (gotVideoInfo && gotAudioInfo){
All_Hell_Broke_Loose = true; FLV::Parse_Error = true;
std::cout << "... after proper video and audio? Cancelling broadcast!" << std::endl; std::cout << "... after proper video and audio? Cancelling broadcast!" << std::endl;
} }
gotVideoInfo = false; gotVideoInfo = false;
gotAudioInfo = false; gotAudioInfo = false;
} }
if (!gotVideoInfo && ringbuf[current_buffer]->FLV->isKeyframe){ //store video init data, if available
if ((ringbuf[current_buffer]->FLV->data[11] & 0x0f) == 7){//avc packet if (!gotVideoInfo && ringbuf[current_buffer]->FLV.isKeyframe){
if (ringbuf[current_buffer]->FLV->data[12] == 0){ if ((ringbuf[current_buffer]->FLV.data[11] & 0x0f) == 7){//avc packet
ringbuf[current_buffer]->FLV->data[4] = 0;//timestamp to zero if (ringbuf[current_buffer]->FLV.data[12] == 0){
ringbuf[current_buffer]->FLV->data[5] = 0;//timestamp to zero ringbuf[current_buffer]->FLV.tagTime(0);//timestamp to zero
ringbuf[current_buffer]->FLV->data[6] = 0;//timestamp to zero video_init = ringbuf[current_buffer]->FLV;
metabuffer = (char*)realloc(metabuffer, metabuflen + ringbuf[current_buffer]->FLV->len);
memcpy(metabuffer+metabuflen, ringbuf[current_buffer]->FLV->data, ringbuf[current_buffer]->FLV->len);
metabuflen += ringbuf[current_buffer]->FLV->len;
gotVideoInfo = true; gotVideoInfo = true;
std::cout << "Received video configuration!" << std::endl; std::cout << "Received video configuration!" << std::endl;
} }
}else{gotVideoInfo = true;}//non-avc = no config... }else{gotVideoInfo = true;}//non-avc = no config...
} }
//store audio init data, if available
if (!gotAudioInfo && (packtype == 0x08)){ if (!gotAudioInfo && (packtype == 0x08)){
if (((ringbuf[current_buffer]->FLV->data[11] & 0xf0) >> 4) == 10){//aac packet if (((ringbuf[current_buffer]->FLV.data[11] & 0xf0) >> 4) == 10){//aac packet
ringbuf[current_buffer]->FLV->data[4] = 0;//timestamp to zero ringbuf[current_buffer]->FLV.tagTime(0);//timestamp to zero
ringbuf[current_buffer]->FLV->data[5] = 0;//timestamp to zero audio_init = ringbuf[current_buffer]->FLV;
ringbuf[current_buffer]->FLV->data[6] = 0;//timestamp to zero
metabuffer = (char*)realloc(metabuffer, metabuflen + ringbuf[current_buffer]->FLV->len);
memcpy(metabuffer+metabuflen, ringbuf[current_buffer]->FLV->data, ringbuf[current_buffer]->FLV->len);
metabuflen += ringbuf[current_buffer]->FLV->len;
gotAudioInfo = true; gotAudioInfo = true;
std::cout << "Received audio configuration!" << std::endl; std::cout << "Received audio configuration!" << std::endl;
}else{gotAudioInfo = true;}//no aac = no config... }else{gotAudioInfo = true;}//no aac = no config...
} }
//on keyframe set start point //on keyframe set possible start point
if (packtype == 0x09){ if (packtype == 0x09){
if (((ringbuf[current_buffer]->FLV->data[11] & 0xf0) >> 4) == 1){ if (((ringbuf[current_buffer]->FLV.data[11] & 0xf0) >> 4) == 1){
lastproper = current_buffer; lastproper = current_buffer;
} }
} }
@ -209,29 +215,35 @@ int main( int argc, char * argv[] ) {
current_buffer++; current_buffer++;
current_buffer %= buffers; current_buffer %= buffers;
} }
//check for new connections, accept them if there are any //check for new connections, accept them if there are any
incoming = DDV_Accept(listener, true); incoming = SS.accept(true);
if (incoming >= 0){ if (incoming.connected()){
users.push_back(incoming); users.push_back(incoming);
//send the FLV header //send the FLV header
users.back().currsend = 0; users.back().currsend = 0;
users.back().MyBuffer = lastproper; users.back().MyBuffer = lastproper;
users.back().MyBuffer_num = -1; users.back().MyBuffer_num = -1;
//TODO: Do this more nicely? //TODO: Do this more nicely?
if (!DDV_write(FLVHeader, 13, incoming)){ if (!incoming.write(FLV::Header, 13)){
users.back().Disconnect("failed to receive the header!"); users.back().Disconnect("failed to receive the header!");
}else{ }else{
if (!DDV_write(metabuffer, metabuflen, incoming)){ if (!incoming.write(metadata.data, metadata.len)){
users.back().Disconnect("failed to receive metadata!"); users.back().Disconnect("failed to receive metadata!");
} }
if (!incoming.write(video_init.data, video_init.len)){
users.back().Disconnect("failed to receive video init!");
}
if (!incoming.write(audio_init.data, audio_init.len)){
users.back().Disconnect("failed to receive audio init!");
}
} }
} }
//send all connections what they need, if and when they need it //send all connections what they need, if and when they need it
if (users.size() > 0){ if (users.size() > 0){
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
if ((*usersIt).s == -1){ if (!(*usersIt).S.connected()){
users.erase(usersIt); break; users.erase(usersIt); break;
}else{ }else{
(*usersIt).Send(ringbuf, buffers); (*usersIt).Send(ringbuf, buffers);
@ -241,13 +253,23 @@ int main( int argc, char * argv[] ) {
}//main loop }//main loop
// disconnect listener // disconnect listener
std::cout << "Reached EOF of input" << std::endl; if (FLV::Parse_Error){
close(listener); std::cout << "FLV parse error" << std::endl;
}else{
std::cout << "Reached EOF of input" << std::endl;
}
SS.close();
while (users.size() > 0){ while (users.size() > 0){
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
(*usersIt).Disconnect("Shutting down..."); (*usersIt).Disconnect("Shutting down...");
if ((*usersIt).s == -1){users.erase(usersIt);break;} if (!(*usersIt).S.connected()){users.erase(usersIt);break;}
} }
} }
return 0; return 0;
} }
};//Buffer namespace
int main(int argc, char ** argv){
Buffer::Start(argc, argv);
}//main

View file

@ -3,8 +3,8 @@
# description: DDVTech HTTP Connector # description: DDVTech HTTP Connector
# processname: Connector_HTTP # processname: Connector_HTTP
prog="Connector_HTTP" prog="DDV_Conn_HTTP"
fullprog="/usr/bin/Connector_HTTP" fullprog="/usr/bin/DDV_Conn_HTTP"
RETVAL=0 RETVAL=0
start() { start() {

View file

@ -1,6 +1,6 @@
SRC = main.cpp ../util/ddv_socket.cpp ../util/http_parser.cpp ../util/flv_tag.cpp SRC = main.cpp ../util/ddv_socket.cpp ../util/http_parser.cpp ../util/flv_tag.cpp
OBJ = $(SRC:.cpp=.o) OBJ = $(SRC:.cpp=.o)
OUT = Connector_HTTP OUT = DDV_Conn_HTTP
INCLUDES = INCLUDES =
CCFLAGS = -Wall -Wextra -funsigned-char -g CCFLAGS = -Wall -Wextra -funsigned-char -g
CC = $(CROSS)g++ CC = $(CROSS)g++

View file

@ -1,6 +1,6 @@
SRC = main.cpp ../sockets/sw_base.cpp ../sockets/sw_inet.cpp ../sockets/sw_unix.cpp SRC = main.cpp ../util/ddv_socket.cpp
OBJ = $(SRC:.cpp=.o) OBJ = $(SRC:.cpp=.o)
OUT = Connector_RAW OUT = DDV_Conn_RAW
INCLUDES = INCLUDES =
CCFLAGS = -Wall -Wextra -funsigned-char -g CCFLAGS = -Wall -Wextra -funsigned-char -g
CC = $(CROSS)g++ CC = $(CROSS)g++

View file

@ -1,5 +1,5 @@
#include <iostream> #include <iostream>
#include "../sockets/SocketW.h" #include "../util/ddv_socket.h"
#include <string> #include <string>
#include <vector> #include <vector>
#include <cstdlib> #include <cstdlib>
@ -7,19 +7,24 @@
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
int main() { int main(int argc, char ** argv) {
SWUnixSocket mySocket; if (argc < 2){
std::string input; std::cout << "Usage: " << argv[0] << " stream_name" << std::endl;
std::cin >> input; return 1;
input = "/tmp/shared_socket_"+input;
mySocket.connect(input);
char buffer[500000];
int msg;
while(std::cout.good()) {
msg = mySocket.recv(&buffer[0],10000);
std::cout.write(buffer,msg);
} }
// disconnect std::string input;
mySocket.disconnect(); input = "/tmp/shared_socket_";
input += argv[1];
DDV::Socket S(input);
if (!S.connected()){
std::cout << "Could not open stream " << argv[1] << std::endl;
return 1;
}
char buffer[50000];
int msg;
while(std::cout.good() && S.read(buffer,50000)){
std::cout.write(buffer,50000);
}
S.close();
return 0; return 0;
} }

View file

@ -1,6 +1,6 @@
SRC = main.cpp SRC = main.cpp ../util/ddv_socket.cpp ../util/flv_tag.cpp
OBJ = $(SRC:.cpp=.o) OBJ = $(SRC:.cpp=.o)
OUT = Connector_RTMP OUT = DDV_Conn_RTMP
INCLUDES = INCLUDES =
STATIC = STATIC =
CCFLAGS = -Wall -Wextra -funsigned-char -g CCFLAGS = -Wall -Wextra -funsigned-char -g

View file

@ -16,164 +16,159 @@
#include <sys/wait.h> #include <sys/wait.h>
#include <sys/epoll.h> #include <sys/epoll.h>
#include <getopt.h> #include <getopt.h>
#include "../util/ddv_socket.h"
#include "../util/flv_tag.h"
//for connection to server
bool ready4data = false;//set to true when streaming starts
bool inited = false;
bool stopparsing = false;
timeval lastrec;
#define DEFAULT_PORT 1935
#include "../util/server_setup.cpp"
int CONN_fd = 0;
#include "parsechunks.cpp" //chunkstream parsing #include "parsechunks.cpp" //chunkstream parsing
#include "handshake.cpp" //handshaking #include "handshake.cpp" //handshaking
int mainHandler(int connection){ /// Holds all functions and data unique to the RTMP Connector
CONN_fd = connection; namespace Connector_RTMP{
unsigned int ts;
unsigned int fts = 0;
unsigned int ftst;
int ss;
FLV_Pack * tag = 0;
//first timestamp set //for connection to server
firsttime = getNowMS(); bool ready4data = false; ///< Set to true when streaming starts.
bool inited = false; ///< Set to true when ready to connect to Buffer.
bool stopparsing = false; ///< Set to true when all parsing needs to be cancelled.
timeval lastrec; ///< Timestamp of last received data.
if (doHandshake()){ DDV::Socket Socket; ///< Socket connected to user
#if DEBUG >= 4
fprintf(stderr, "Handshake succcess!\n"); /// Main Connector_RTMP function
int Connector_RTMP(DDV::Socket conn){
Socket = conn;
unsigned int ts;
unsigned int fts = 0;
unsigned int ftst;
DDV::Socket SS;
FLV::Tag tag = 0;
//first timestamp set
firsttime = getNowMS();
if (doHandshake()){
#if DEBUG >= 4
fprintf(stderr, "Handshake succcess!\n");
#endif
}else{
#if DEBUG >= 1
fprintf(stderr, "Handshake fail!\n");
#endif
return 0;
}
int retval;
int poller = epoll_create(1);
int sspoller = epoll_create(1);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = CONN_fd;
epoll_ctl(poller, EPOLL_CTL_ADD, CONN_fd, &ev);
struct epoll_event events[1];
#if DEBUG >= 5
//for writing whole stream to a file
FILE * tmpfile = 0;
char tmpstr[200];
#endif
while (Socket.connected() && !FLV::Parse_Error){
//only parse input if available or not yet init'ed
//rightnow = getNowMS();
retval = epoll_wait(poller, events, 1, 1);
if ((retval > 0) || !ready4data){// || (snd_cnt - snd_window_at >= snd_window_size)
switch (Socket.ready()){
case -1: break; //disconnected
case 0: break; //not ready yet
default: parseChunk(); break; //new data is waiting
}
}
if (ready4data){
if (!inited){
//we are ready, connect the socket!
SS = DDV::Socket(streamname);
if (!SS.connected()){
#if DEBUG >= 1
fprintf(stderr, "Could not connect to server!\n");
#endif
Socket.close();//disconnect user
break;
}
ev.events = EPOLLIN;
ev.data.fd = SS.getSocket();
epoll_ctl(sspoller, EPOLL_CTL_ADD, SS.getSocket(), &ev);
#if DEBUG >= 3
fprintf(stderr, "Everything connected, starting to send video data...\n");
#endif
inited = true;
}
retval = epoll_wait(sspoller, events, 1, 1);
switch (SS.ready()){
case -1:
#if DEBUG >= 1
fprintf(stderr, "Source socket is disconnected.\n");
#endif
Socket.close();//disconnect user
break;
case 0: break;//not ready yet
default:
if (tag.SockLoader(SS)){//able to read a full packet?
ts = tag.tagTime();
if (ts != 0){
if (fts == 0){fts = ts;ftst = getNowMS();}
ts -= fts;
tag.tagTime(ts);
ts += ftst;
}else{
ftst = getNowMS();
tag.tagTime(ftst);
}
SendMedia((unsigned char)tag.data[0], (unsigned char *)tag.data+11, tag.len-15, ts);
#if DEBUG >= 5
//write whole stream to a file
if (tmpfile == 0){
sprintf(tmpstr, "./tmpfile_socket_%i.flv", CONN_fd);
tmpfile = fopen(tmpstr, "w");
fwrite(FLVHeader, 13, 1, tmpfile);
}
fwrite(tag->data, tag->len, 1, tmpfile);
#endif
#if DEBUG >= 4
fprintf(stderr, "Sent a tag to %i\n", CONN_fd);
#endif
}
break;
}
}
//send ACK if we received a whole window
if ((rec_cnt - rec_window_at > rec_window_size)){
rec_window_at = rec_cnt;
SendCTL(3, rec_cnt);//send ack (msg 3)
}
}
SS.close();
Socket.close();
#if DEBUG >= 5
fclose(tmpfile);
#endif #endif
}else{
#if DEBUG >= 1 #if DEBUG >= 1
fprintf(stderr, "Handshake fail!\n"); if (FLV::Parse_Error){fprintf(stderr, "FLV Parse Error\n");}
fprintf(stderr, "User %i disconnected.\n", conn.getSocket());
if (inited){
fprintf(stderr, "Status was: inited\n");
}else{
if (ready4data){
fprintf(stderr, "Status was: ready4data\n");
}else{
fprintf(stderr, "Status was: connected\n");
}
}
#endif #endif
return 0; return 0;
} }//Connector_RTMP
int retval; };//Connector_RTMP namespace
int poller = epoll_create(1);
int sspoller = epoll_create(1);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = CONN_fd;
epoll_ctl(poller, EPOLL_CTL_ADD, CONN_fd, &ev);
struct epoll_event events[1];
#if DEBUG >= 5
//for writing whole stream to a file
FILE * tmpfile = 0;
char tmpstr[200];
#endif
while (!socketError && !All_Hell_Broke_Loose){
//only parse input if available or not yet init'ed
//rightnow = getNowMS();
retval = epoll_wait(poller, events, 1, 1);
if ((retval > 0) || !ready4data){// || (snd_cnt - snd_window_at >= snd_window_size)
switch (DDV_ready(CONN_fd)){
case 0:
socketError = true;
#if DEBUG >= 1
fprintf(stderr, "User socket is disconnected.\n");
#endif
break;
case -1: break;//not ready yet
default:
parseChunk();
break;
}
}
if (ready4data){
if (!inited){
//we are ready, connect the socket!
ss = DDV_OpenUnix(streamname);
if (ss <= 0){
#if DEBUG >= 1
fprintf(stderr, "Could not connect to server!\n");
#endif
socketError = 1;
break;
}
ev.events = EPOLLIN;
ev.data.fd = ss;
epoll_ctl(sspoller, EPOLL_CTL_ADD, ss, &ev);
#if DEBUG >= 3
fprintf(stderr, "Everything connected, starting to send video data...\n");
#endif
inited = true;
}
retval = epoll_wait(sspoller, events, 1, 1); // Load main server setup file, default port 1935, handler is Connector_RTMP::Connector_RTMP
switch (DDV_ready(ss)){ #define DEFAULT_PORT 1935
case 0: #define MAINHANDLER Connector_RTMP::Connector_RTMP
socketError = true; #include "../util/server_setup.cpp"
#if DEBUG >= 1
fprintf(stderr, "Source socket is disconnected.\n");
#endif
break;
case -1: break;//not ready yet
default:
if (FLV_GetPacket(tag, ss)){//able to read a full packet?
ts = tag->data[7] * 256*256*256;
ts += tag->data[4] * 256*256;
ts += tag->data[5] * 256;
ts += tag->data[6];
if (ts != 0){
if (fts == 0){fts = ts;ftst = getNowMS();}
ts -= fts;
tag->data[7] = ts / (256*256*256);
tag->data[4] = ts / (256*256);
tag->data[5] = ts / 256;
tag->data[6] = ts % 256;
ts += ftst;
}else{
ftst = getNowMS();
tag->data[7] = ftst / (256*256*256);
tag->data[4] = ftst / (256*256);
tag->data[5] = ftst / 256;
tag->data[6] = ftst % 256;
}
SendMedia((unsigned char)tag->data[0], (unsigned char *)tag->data+11, tag->len-15, ts);
#if DEBUG >= 5
//write whole stream to a file
if (tmpfile == 0){
sprintf(tmpstr, "./tmpfile_socket_%i.flv", CONN_fd);
tmpfile = fopen(tmpstr, "w");
fwrite(FLVHeader, 13, 1, tmpfile);
}
fwrite(tag->data, tag->len, 1, tmpfile);
#endif
#if DEBUG >= 4
fprintf(stderr, "Sent a tag to %i\n", CONN_fd);
#endif
}
break;
}
}
//send ACK if we received a whole window
if ((rec_cnt - rec_window_at > rec_window_size)){
rec_window_at = rec_cnt;
SendCTL(3, rec_cnt);//send ack (msg 3)
}
}
close(CONN_fd);
#if DEBUG >= 5
fclose(tmpfile);
#endif
if (inited) close(ss);
#if DEBUG >= 1
if (All_Hell_Broke_Loose){fprintf(stderr, "All Hell Broke Loose\n");}
fprintf(stderr, "User %i disconnected.\n", CONN_fd);
if (inited){
fprintf(stderr, "Status was: inited\n");
}else{
if (ready4data){
fprintf(stderr, "Status was: ready4data\n");
}else{
fprintf(stderr, "Status was: connected\n");
}
}
#endif
return 0;
}//mainHandler

View file

@ -8,6 +8,14 @@ DDV::Socket::Socket(int sockNo){
Blocking = false; Blocking = false;
}//DDV::Socket basic constructor }//DDV::Socket basic constructor
/// Create a new disconnected base socket. This is a basic constructor for placeholder purposes.
/// A socket created like this is always disconnected and should/could be overwritten at some point.
DDV::Socket::Socket(){
sock = -1;
Error = false;
Blocking = false;
}//DDV::Socket basic constructor
/// Close connection. The internal socket is closed and then set to -1. /// Close connection. The internal socket is closed and then set to -1.
void DDV::Socket::close(){ void DDV::Socket::close(){
#if DEBUG >= 3 #if DEBUG >= 3

View file

@ -19,6 +19,7 @@ namespace DDV{
private: private:
int sock; ///< Internally saved socket number. int sock; ///< Internally saved socket number.
public: public:
Socket(); ///< Create a new disconnected base socket.
Socket(int sockNo); ///< Create a new base socket. Socket(int sockNo); ///< Create a new base socket.
Socket(std::string adres, bool nonblock = false); ///< Create a new Unix Socket. Socket(std::string adres, bool nonblock = false); ///< Create a new Unix Socket.
bool Error; ///< Set to true if a socket error happened. bool Error; ///< Set to true if a socket error happened.