Merge branch 'master' of projectlivestream.com:pls

This commit is contained in:
Erik Zandvliet 2010-11-08 19:00:24 +01:00
commit 4a36e772d7
11 changed files with 619 additions and 245 deletions

View file

@ -1,3 +1,5 @@
#include <unistd.h>
#include <fcntl.h>
#include <iostream>
#include "../sockets/SocketW.h"
#include <string>
@ -9,6 +11,8 @@
#include "../util/flv.cpp" //FLV format parser
#include "user.cpp"
#include <sys/epoll.h>
int get_empty( user ** list, int amount ) {
for (int i = 0; i < amount; i++ ){
if (!list[i]->is_connected){return i;}
@ -47,15 +51,26 @@ int main( int argc, char * argv[] ) {
unsigned char packtype;
bool gotVideoInfo = false;
bool gotAudioInfo = false;
while(std::cin.good() && std::cout.good()) {
loopcount ++;
//set stdin to be nonblocking
//int flags = fcntl(0, F_GETFL, 0);
//flags |= O_NONBLOCK;
//fcntl(0, F_SETFL, flags);
int infile = fileno(stdin);
int poller = epoll_create(1);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = infile;
epoll_ctl(poller, EPOLL_CTL_ADD, infile, &ev);
struct epoll_event events[1];
while(!feof(stdin) && !All_Hell_Broke_Loose){
//invalidate the current buffer
ringbuf[current_buffer]->number = -1;
if (std::cin.peek() == 'F') {
//new FLV file, read the file header again.
FLV_Readheader();
} else {
FLV_GetPacket(ringbuf[current_buffer]->FLV);
if ((epoll_wait(poller, events, 1, 100) > 0) && FLV_GetPacket(ringbuf[current_buffer]->FLV)){
loopcount ++;
packtype = ringbuf[current_buffer]->FLV->data[0];
//store metadata, if available
if (packtype == 0x12){
@ -63,6 +78,10 @@ int main( int argc, char * argv[] ) {
metabuffer = (char*)realloc(metabuffer, metabuflen);
memcpy(metabuffer, ringbuf[current_buffer]->FLV->data, metabuflen);
std::cout << "Received metadata!" << std::endl;
if (gotVideoInfo && gotAudioInfo){
All_Hell_Broke_Loose = true;
std::cout << "... after proper video and audio? Cancelling broadcast!" << std::endl;
}
gotVideoInfo = false;
gotAudioInfo = false;
}
@ -96,6 +115,13 @@ int main( int argc, char * argv[] ) {
if (packtype == 0x09){
if (((ringbuf[current_buffer]->FLV->data[11] & 0xf0) >> 4) == 1){lastproper = current_buffer;}
}
//keep track of buffers
current_buffer++;
current_buffer %= buffers;
ringbuf[current_buffer]->number = loopcount;
}
//check for new connections, accept them if there are any
incoming = listener.accept(&BError);
if (incoming){
connectionList.push_back(user(incoming));
@ -114,17 +140,14 @@ int main( int argc, char * argv[] ) {
connectionList.back().disconnect("Socket error: " + BError.get_error());
}
}
ringbuf[current_buffer]->number = loopcount;
//send all connections what they need, if and when they need it
if (connectionList.size() > 0){
for (connIt = connectionList.begin(); connIt != connectionList.end(); connIt++){
if (!(*connIt).is_connected){connectionList.erase(connIt);break;}
(*connIt).Send(ringbuf, buffers);
}
//keep track of buffers
current_buffer++;
current_buffer %= buffers;
}
}
}//main loop
// disconnect listener
std::cout << "Reached EOF of input" << std::endl;

58
Connector_RTMP/Conn_RTMP Executable file
View file

@ -0,0 +1,58 @@
#!/bin/sh
#
# chkconfig: 345 92 8
# description: DDVTech RTMP Connector
#
# processname: Connector_RTMP
. /etc/rc.d/init.d/functions
prog="Connector_RTMP"
fullprog="/usr/bin/Connector_RTMP"
RETVAL=0
start() {
gprintf "Starting %s: " $prog
daemon --user=root $fullprog
RETVAL=$?
echo
[ $RETVAL -eq 0 ] && touch /var/lock/subsys/$prog
return $RETVAL
}
stop() {
gprintf "Stopping %s: " $prog
killproc $fullprog
RETVAL=$?
echo
[ $RETVAL -eq 0 ] && rm -f /var/lock/subsys/$prog
return $RETVAL
}
case "$1" in
start)
start
;;
stop)
stop
;;
restart)
stop
start
;;
condrestart)
if [ -f /var/lock/subsys/$prog ]; then
stop
start
fi
;;
status)
status $fullprog
RETVAL=$?
;;
*)
gprintf "Usage: %s {start|stop|restart|status}" $0
RETVAL=1
esac
exit $RETVAL

View file

@ -16,8 +16,8 @@ $(OUT): $(OBJ) chunkstream.cpp parsechunks.cpp handshake.cpp crypto.cpp amf.cpp
$(CC) $(LIBS) -o $(OUT) $(OBJ)
clean:
rm -rf $(OBJ) $(OUT) Makefile.bak *~
run-test: $(OUT)
rm -rf ./meh
mkfifo ./meh
cat ./meh &
nc -l -p 1935 -e './Connector_RTMP 2>./meh'
install: $(OUT)
-service Conn_RTMP stop
cp -f ./$(OUT) /usr/bin/
cp -f ./Conn_RTMP /etc/init.d/
service Conn_RTMP start

View file

@ -108,18 +108,18 @@ void SendChunk(chunkpack ch){
}
}
if (ch.cs_id <= 63){
tmp = chtype | ch.cs_id; fwrite(&tmp, 1, 1, stdout);
tmp = chtype | ch.cs_id; DDV_write(&tmp, 1, 1, CONN_fd);
snd_cnt+=1;
}else{
if (ch.cs_id <= 255+64){
tmp = chtype | 0; fwrite(&tmp, 1, 1, stdout);
tmp = ch.cs_id - 64; fwrite(&tmp, 1, 1, stdout);
tmp = chtype | 0; DDV_write(&tmp, 1, 1, CONN_fd);
tmp = ch.cs_id - 64; DDV_write(&tmp, 1, 1, CONN_fd);
snd_cnt+=2;
}else{
tmp = chtype | 1; fwrite(&tmp, 1, 1, stdout);
tmp = chtype | 1; DDV_write(&tmp, 1, 1, CONN_fd);
tmpi = ch.cs_id - 64;
tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout);
tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout);
tmp = tmpi % 256; DDV_write(&tmp, 1, 1, CONN_fd);
tmp = tmpi / 256; DDV_write(&tmp, 1, 1, CONN_fd);
snd_cnt+=3;
}
}
@ -129,67 +129,67 @@ void SendChunk(chunkpack ch){
if (chtype == 0x00){
tmpi = ch.timestamp;
if (tmpi >= 0x00ffffff){ntime = tmpi; tmpi = 0x00ffffff;}
tmp = tmpi / (256*256); fwrite(&tmp, 1, 1, stdout);
tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout);
tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout);
tmp = tmpi / (256*256); DDV_write(&tmp, 1, 1, CONN_fd);
tmp = tmpi / 256; DDV_write(&tmp, 1, 1, CONN_fd);
tmp = tmpi % 256; DDV_write(&tmp, 1, 1, CONN_fd);
snd_cnt+=3;
}else{
tmpi = ch.timestamp - prev.timestamp;
if (tmpi >= 0x00ffffff){ntime = tmpi; tmpi = 0x00ffffff;}
tmp = tmpi / (256*256); fwrite(&tmp, 1, 1, stdout);
tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout);
tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout);
tmp = tmpi / (256*256); DDV_write(&tmp, 1, 1, CONN_fd);
tmp = tmpi / 256; DDV_write(&tmp, 1, 1, CONN_fd);
tmp = tmpi % 256; DDV_write(&tmp, 1, 1, CONN_fd);
snd_cnt+=3;
}
if (chtype != 0x80){
//len
tmpi = ch.len;
tmp = tmpi / (256*256); fwrite(&tmp, 1, 1, stdout);
tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout);
tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout);
tmp = tmpi / (256*256); DDV_write(&tmp, 1, 1, CONN_fd);
tmp = tmpi / 256; DDV_write(&tmp, 1, 1, CONN_fd);
tmp = tmpi % 256; DDV_write(&tmp, 1, 1, CONN_fd);
snd_cnt+=3;
//msg type id
tmp = ch.msg_type_id; fwrite(&tmp, 1, 1, stdout);
tmp = ch.msg_type_id; DDV_write(&tmp, 1, 1, CONN_fd);
snd_cnt+=1;
if (chtype != 0x40){
//msg stream id
tmp = ch.msg_stream_id % 256; fwrite(&tmp, 1, 1, stdout);
tmp = ch.msg_stream_id / 256; fwrite(&tmp, 1, 1, stdout);
tmp = ch.msg_stream_id / (256*256); fwrite(&tmp, 1, 1, stdout);
tmp = ch.msg_stream_id / (256*256*256); fwrite(&tmp, 1, 1, stdout);
tmp = ch.msg_stream_id % 256; DDV_write(&tmp, 1, 1, CONN_fd);
tmp = ch.msg_stream_id / 256; DDV_write(&tmp, 1, 1, CONN_fd);
tmp = ch.msg_stream_id / (256*256); DDV_write(&tmp, 1, 1, CONN_fd);
tmp = ch.msg_stream_id / (256*256*256); DDV_write(&tmp, 1, 1, CONN_fd);
snd_cnt+=4;
}
}
}
//support for 0x00ffffff timestamps
if (ntime){
tmp = ntime / (256*256*256); fwrite(&tmp, 1, 1, stdout);
tmp = ntime / (256*256); fwrite(&tmp, 1, 1, stdout);
tmp = ntime / 256; fwrite(&tmp, 1, 1, stdout);
tmp = ntime % 256; fwrite(&tmp, 1, 1, stdout);
tmp = ntime / (256*256*256); DDV_write(&tmp, 1, 1, CONN_fd);
tmp = ntime / (256*256); DDV_write(&tmp, 1, 1, CONN_fd);
tmp = ntime / 256; DDV_write(&tmp, 1, 1, CONN_fd);
tmp = ntime % 256; DDV_write(&tmp, 1, 1, CONN_fd);
snd_cnt+=4;
}
ch.len_left = 0;
while (ch.len_left < ch.len){
tmpi = ch.len - ch.len_left;
if (tmpi > chunk_snd_max){tmpi = chunk_snd_max;}
fwrite((ch.data + ch.len_left), 1, tmpi, stdout);
DDV_write((ch.data + ch.len_left), 1, tmpi, CONN_fd);
snd_cnt+=tmpi;
ch.len_left += tmpi;
if (ch.len_left < ch.len){
if (ch.cs_id <= 63){
tmp = 0xC0 + ch.cs_id; fwrite(&tmp, 1, 1, stdout);
tmp = 0xC0 + ch.cs_id; DDV_write(&tmp, 1, 1, CONN_fd);
snd_cnt+=1;
}else{
if (ch.cs_id <= 255+64){
tmp = 0xC0; fwrite(&tmp, 1, 1, stdout);
tmp = ch.cs_id - 64; fwrite(&tmp, 1, 1, stdout);
tmp = 0xC0; DDV_write(&tmp, 1, 1, CONN_fd);
tmp = ch.cs_id - 64; DDV_write(&tmp, 1, 1, CONN_fd);
snd_cnt+=2;
}else{
tmp = 0xC1; fwrite(&tmp, 1, 1, stdout);
tmp = 0xC1; DDV_write(&tmp, 1, 1, CONN_fd);
tmpi = ch.cs_id - 64;
tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout);
tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout);
tmp = tmpi % 256; DDV_write(&tmp, 1, 1, CONN_fd);
tmp = tmpi / 256; DDV_write(&tmp, 1, 1, CONN_fd);
snd_cnt+=4;
}
}
@ -310,19 +310,19 @@ struct chunkpack getChunk(){
gettimeofday(&lastrec, 0);
struct chunkpack ret;
unsigned char temp;
fread(&(ret.chunktype), 1, 1, stdin);
DDV_read(&(ret.chunktype), 1, 1, CONN_fd);
rec_cnt++;
//read the chunkstream ID properly
switch (ret.chunktype & 0x3F){
case 0:
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
rec_cnt++;
ret.cs_id = temp + 64;
break;
case 1:
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.cs_id = temp + 64;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.cs_id += temp * 256;
rec_cnt+=2;
break;
@ -334,57 +334,57 @@ struct chunkpack getChunk(){
//process the rest of the header, for each chunk type
switch (ret.chunktype & 0xC0){
case 0x00:
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.timestamp = temp*256*256;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.timestamp += temp*256;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.timestamp += temp;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.len = temp*256*256;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.len += temp*256;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.len += temp;
ret.len_left = 0;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.msg_type_id = temp;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.msg_stream_id = temp;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.msg_stream_id += temp*256;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.msg_stream_id += temp*256*256;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.msg_stream_id += temp*256*256*256;
rec_cnt+=11;
break;
case 0x40:
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.timestamp = temp*256*256;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.timestamp += temp*256;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.timestamp += temp;
ret.timestamp += prev.timestamp;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.len = temp*256*256;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.len += temp*256;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.len += temp;
ret.len_left = 0;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.msg_type_id = temp;
ret.msg_stream_id = prev.msg_stream_id;
rec_cnt+=7;
break;
case 0x80:
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.timestamp = temp*256*256;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.timestamp += temp*256;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.timestamp += temp;
ret.timestamp += prev.timestamp;
ret.len = prev.len;
@ -414,20 +414,20 @@ struct chunkpack getChunk(){
}
//read extended timestamp, if neccesary
if (ret.timestamp == 0x00ffffff){
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.timestamp = temp*256*256*256;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.timestamp += temp*256*256;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.timestamp += temp*256;
fread(&temp, 1, 1, stdin);
DDV_read(&temp, 1, 1, CONN_fd);
ret.timestamp += temp;
rec_cnt+=4;
}
//read data if length > 0, and allocate it
if (ret.real_len > 0){
ret.data = (unsigned char*)malloc(ret.real_len);
fread(ret.data, 1, ret.real_len, stdin);
DDV_read(ret.data, 1, ret.real_len, CONN_fd);
rec_cnt+=ret.real_len;
}else{
ret.data = 0;
@ -484,7 +484,7 @@ chunkpack getWholeChunk(){
if (!clean){gwc_complete.data = 0; clean = true;}//prevent brain damage
chunkpack * ret = 0;
scrubChunk(gwc_complete);
while (counter < 10000){
while (counter < 1000){
gwc_next = getChunk();
ret = AddChunkPart(gwc_next);
scrubChunk(gwc_next);
@ -493,7 +493,7 @@ chunkpack getWholeChunk(){
free(ret);//cleanup returned chunk
return gwc_complete;
}
if (feof(stdin) != 0){break;}
if (socketError || socketBlocking){break;}
counter++;
}
gwc_complete.msg_type_id = 0;

View file

@ -14,11 +14,11 @@ bool doHandshake(){
Handshake Client;
Handshake Server;
/** Read C0 **/
fread(&(Version), 1, 1, stdin);
DDV_read(&(Version), 1, 1, CONN_fd);
/** Read C1 **/
fread(Client.Time, 1, 4, stdin);
fread(Client.Zero, 1, 4, stdin);
fread(Client.Random, 1, 1528, stdin);
DDV_read(Client.Time, 1, 4, CONN_fd);
DDV_read(Client.Zero, 1, 4, CONN_fd);
DDV_read(Client.Random, 1, 1528, CONN_fd);
rec_cnt+=1537;
/** Build S1 Packet **/
Server.Time[0] = 0; Server.Time[1] = 0; Server.Time[2] = 0; Server.Time[3] = 0;
@ -27,25 +27,25 @@ bool doHandshake(){
Server.Random[i] = versionstring[i%13];
}
/** Send S0 **/
fwrite(&(Version), 1, 1, stdout);
DDV_write(&(Version), 1, 1, CONN_fd);
/** Send S1 **/
fwrite(Server.Time, 1, 4, stdout);
fwrite(Server.Zero, 1, 4, stdout);
fwrite(Server.Random, 1, 1528, stdout);
DDV_write(Server.Time, 1, 4, CONN_fd);
DDV_write(Server.Zero, 1, 4, CONN_fd);
DDV_write(Server.Random, 1, 1528, CONN_fd);
/** Flush output, just for certainty **/
fflush(stdout);
//fflush(CONN_fd);
snd_cnt+=1537;
/** Send S2 **/
fwrite(Client.Time, 1, 4, stdout);
fwrite(Client.Time, 1, 4, stdout);
fwrite(Client.Random, 1, 1528, stdout);
DDV_write(Client.Time, 1, 4, CONN_fd);
DDV_write(Client.Time, 1, 4, CONN_fd);
DDV_write(Client.Random, 1, 1528, CONN_fd);
snd_cnt+=1536;
/** Flush, necessary in order to work **/
fflush(stdout);
//fflush(CONN_fd);
/** Read and discard C2 **/
fread(Client.Time, 1, 4, stdin);
fread(Client.Zero, 1, 4, stdin);
fread(Client.Random, 1, 1528, stdin);
DDV_read(Client.Time, 1, 4, CONN_fd);
DDV_read(Client.Zero, 1, 4, CONN_fd);
DDV_read(Client.Random, 1, 1528, CONN_fd);
rec_cnt+=1536;
return true;
}//doHandshake
@ -57,10 +57,10 @@ bool doHandshake(){
bool doHandshake(){
char Version;
/** Read C0 **/
fread(&Version, 1, 1, stdin);
DDV_read(&Version, 1, 1, CONN_fd);
uint8_t Client[1536];
uint8_t Server[3072];
fread(&Client, 1, 1536, stdin);
DDV_read(&Client, 1, 1536, CONN_fd);
rec_cnt+=1537;
/** Build S1 Packet **/
@ -123,13 +123,13 @@ bool doHandshake(){
delete[] pLastHash;
//***** DONE BUILDING THE RESPONSE ***//
/** Send response **/
fwrite(&Version, 1, 1, stdout);
fwrite(&Server, 1, 3072, stdout);
DDV_write(&Version, 1, 1, CONN_fd);
DDV_write(&Server, 1, 3072, CONN_fd);
snd_cnt+=3073;
/** Flush, necessary in order to work **/
fflush(stdout);
//fflush(CONN_fd);
/** Read and discard C2 **/
fread(Client, 1, 1536, stdin);
DDV_read(Client, 1, 1536, CONN_fd);
rec_cnt+=1536;
return true;
}

View file

@ -1,38 +1,72 @@
#undef DEBUG
#define DEBUG
#include <iostream>
#include <cstdlib>
#include <cstdio>
#include <cmath>
//needed for select
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/epoll.h>
//for connection to server
#include "../sockets/SocketW.h"
bool ready4data = false;//set to true when streaming starts
bool inited = false;
bool stopparsing = false;
timeval lastrec;
int CONN_fd = 0;
#include "../util/ddv_socket.cpp" //DDVTech Socket wrapper
#include "../util/flv_sock.cpp" //FLV parsing with SocketW
#include "parsechunks.cpp" //chunkstream parsing
#include "handshake.cpp" //handshaking
#include "../util/flv_sock.cpp" //FLV parsing with SocketW
int main(){
int server_socket = 0;
void termination_handler (int signum){
if (server_socket == 0) return;
close(server_socket);
server_socket = 0;
}
int main(int argc, char ** argv){
//setup signal handler
struct sigaction new_action;
new_action.sa_handler = termination_handler;
sigemptyset (&new_action.sa_mask);
new_action.sa_flags = 0;
sigaction (SIGINT, &new_action, NULL);
sigaction (SIGHUP, &new_action, NULL);
sigaction (SIGTERM, &new_action, NULL);
server_socket = DDV_Listen(1935);
if ((argc < 2) || (argv[1] == "nd")){
if (server_socket > 0){daemon(1, 0);}else{return 1;}
}
int status;
while (server_socket > 0){
waitpid((pid_t)-1, &status, WNOHANG);
CONN_fd = DDV_Accept(server_socket);
if (CONN_fd > 0){
pid_t myid = fork();
if (myid == 0){
break;
}else{
printf("Spawned new process %i for handling socket %i\n", (int)myid, CONN_fd);
}
}
}
if (server_socket <= 0){
return 0;
}
unsigned int ts;
unsigned int fts = 0;
unsigned int ftst;
SWUnixSocket ss;
fd_set pollset;
struct timeval timeout;
//0 timeout - return immediately after select call
timeout.tv_sec = 1; timeout.tv_usec = 0;
FD_ZERO(&pollset);//clear the polling set
FD_SET(0, &pollset);//add stdin to polling set
int ss;
FLV_Pack * tag;
//first timestamp set
firsttime = getNowMS();
@ -53,58 +87,71 @@ int main(){
#ifdef DEBUG
fprintf(stderr, "Starting processing...\n");
#endif
while (std::cin.good() && std::cout.good()){
//select(1, &pollset, 0, 0, &timeout);
//only parse input from stdin if available or not yet init'ed
//FD_ISSET(0, &pollset) || //NOTE: Polling does not work? WHY?!? WHY DAMN IT?!?
if ((!ready4data || (snd_cnt - snd_window_at >= snd_window_size)) && !stopparsing){parseChunk();fflush(stdout);}
int retval;
int poller = epoll_create(1);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = CONN_fd;
epoll_ctl(poller, EPOLL_CTL_ADD, CONN_fd, &ev);
struct epoll_event events[1];
while (!socketError && !All_Hell_Broke_Loose){
//only parse input if available or not yet init'ed
//rightnow = getNowMS();
retval = epoll_wait(poller, events, 1, 0);
if (!ready4data || (snd_cnt - snd_window_at >= snd_window_size)){
if (DDV_ready(CONN_fd)){
parseChunk();
}
}
if (ready4data){
if (!inited){
//we are ready, connect the socket!
if (!ss.connect(streamname.c_str())){
ss = DDV_OpenUnix(streamname.c_str());
if (ss <= 0){
#ifdef DEBUG
fprintf(stderr, "Could not connect to server!\n");
#endif
return 0;
}
FLV_Readheader(ss);//read the header, we don't want it
#ifdef DEBUG
fprintf(stderr, "Header read, starting to send video data...\n");
fprintf(stderr, "Everything connected, starting to send video data...\n");
#endif
inited = true;
}
//only send data if previous data has been ACK'ed...
if (snd_cnt - snd_window_at < snd_window_size){
if (FLV_GetPacket(ss)){//able to read a full packet?
ts = FLVbuffer[7] * 256*256*256;
ts += FLVbuffer[4] * 256*256;
ts += FLVbuffer[5] * 256;
ts += FLVbuffer[6];
//if (snd_cnt - snd_window_at < snd_window_size){
if (FLV_GetPacket(tag, ss)){//able to read a full packet?
ts = tag->data[7] * 256*256*256;
ts += tag->data[4] * 256*256;
ts += tag->data[5] * 256;
ts += tag->data[6];
if (ts != 0){
if (fts == 0){fts = ts;ftst = getNowMS();}
ts -= fts;
FLVbuffer[7] = ts / (256*256*256);
FLVbuffer[4] = ts / (256*256);
FLVbuffer[5] = ts / 256;
FLVbuffer[6] = ts % 256;
tag->data[7] = ts / (256*256*256);
tag->data[4] = ts / (256*256);
tag->data[5] = ts / 256;
tag->data[6] = ts % 256;
ts += ftst;
}else{
ftst = getNowMS();
FLVbuffer[7] = ftst / (256*256*256);
FLVbuffer[4] = ftst / (256*256);
FLVbuffer[5] = ftst / 256;
FLVbuffer[6] = ftst % 256;
tag->data[7] = ftst / (256*256*256);
tag->data[4] = ftst / (256*256);
tag->data[5] = ftst / 256;
tag->data[6] = ftst % 256;
}
SendMedia((unsigned char)FLVbuffer[0], (unsigned char *)FLVbuffer+11, FLV_len-15, ts);
FLV_Dump();//dump packet and get ready for next
}
if ((SWBerr != SWBaseSocket::ok) && (SWBerr != SWBaseSocket::notReady)){
SendMedia((unsigned char)tag->data[0], (unsigned char *)tag->data+11, tag->len-15, ts);
#ifdef DEBUG
fprintf(stderr, "No more data! :-( (%s)\n", SWBerr.get_error().c_str());
fprintf(stderr, "Sent a tag to %i\n", CONN_fd);
#endif
return 0;//no more input possible! Fail immediately.
}
}
//}
}
//send ACK if we received a whole window
if (rec_cnt - rec_window_at > rec_window_size){
@ -112,8 +159,8 @@ int main(){
SendCTL(3, rec_cnt);//send ack (msg 3)
}
}
#ifdef DEBUG
fprintf(stderr, "User disconnected.\n");
#endif
//#ifdef DEBUG
fprintf(stderr, "User %i disconnected.\n", CONN_fd);
//#endif
return 0;
}//main

View file

@ -1,4 +1,4 @@
default: client-local-install
default: client-install
client:
cd Connector_HTTP; $(MAKE)
@ -13,19 +13,12 @@ client-clean:
#cd Connector_RTSP; $(MAKE) clean
cd Buffer; $(MAKE) clean
clean: client-clean
client-install: client
client-install: client-clean client
service xinetd stop
cp -f ./Connector_HTTP/Connector_HTTP /usr/bin/
cp -f ./Connector_RTMP/Connector_RTMP /usr/bin/
cd Connector_RTMP; $(MAKE) install
cp -f ./Connector_RAW/Connector_RAW /usr/bin/
#cp -f ./Connector_RTSP/Connector_RTSP /usr/bin/
cp -f ./Buffer/Buffer /usr/bin/
cp -f ./PLS /etc/xinetd.d/
service xinetd restart
client-local-install: client
mkdir -p ./bin
cp -f ./Connector_HTTP/Connector_HTTP ./bin/
cp -f ./Connector_RTMP/Connector_RTMP ./bin/
cp -f ./Connector_RTMP/Connector_RAW ./bin/
#cp -f ./Connector_RTSP/Connector_RTSP ./bin/
cp -f ./Buffer/Buffer ./bin/
service xinetd start

16
PLS
View file

@ -8,18 +8,8 @@ service ddvtechhttp
server = /usr/bin/Connector_HTTP
port = 7337
wait = no
}
service ddvtechrtmp
{
disable = no
type = UNLISTED
protocol = tcp
socket_type = stream
user = root
server = /usr/bin/Connector_RTMP
port = 1935
wait = no
per_source = 10
cps = 100 5
}
service ddvtechraw
@ -32,5 +22,7 @@ service ddvtechraw
server = /usr/bin/Connector_RAW
port = 3773
wait = no
per_source = 10
cps = 100 5
}

154
util/ddv_socket.cpp Normal file
View file

@ -0,0 +1,154 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
bool socketError = false;
bool socketBlocking = false;
int DDV_OpenUnix(const char adres[], bool nonblock = false){
int s = socket(AF_UNIX, SOCK_STREAM, 0);
sockaddr_un addr;
addr.sun_family = AF_UNIX;
strcpy(addr.sun_path, adres);
int r = connect(s, (sockaddr*)&addr, sizeof(addr));
if (r == 0){
if (nonblock){
int flags = fcntl(s, F_GETFL, 0);
flags |= O_NONBLOCK;
fcntl(s, F_SETFL, flags);
}
return s;
}else{
close(s);
return 0;
}
}
int DDV_Listen(int port){
int s = socket(AF_INET, SOCK_STREAM, 0);
int on = 1;
setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);//port 8888
inet_pton(AF_INET, "0.0.0.0", &addr.sin_addr);//listen on all interfaces
int ret = bind(s, (sockaddr*)&addr, sizeof(addr));//bind to all interfaces, chosen port
if (ret == 0){
ret = listen(s, 100);//start listening, backlog of 100 allowed
if (ret == 0){
return s;
}else{
printf("Listen failed! Error: %s\n", strerror(errno));
close(s);
return 0;
}
}else{
printf("Binding failed! Error: %s\n", strerror(errno));
close(s);
return 0;
}
}
int DDV_Accept(int sock, bool nonblock = false){
int r = accept(sock, 0, 0);
if ((r > 0) && nonblock){
int flags = fcntl(r, F_GETFL, 0);
flags |= O_NONBLOCK;
fcntl(r, F_SETFL, flags);
}
return r;
}
bool DDV_write(void * buffer, int todo, int sock){
int sofar = 0;
socketBlocking = false;
while (sofar != todo){
int r = send(sock, (char*)buffer + sofar, todo-sofar, 0);
if (r <= 0){
switch (errno){
case EWOULDBLOCK: socketBlocking = true; break;
default:
socketError = true;
printf("Could not write! %s\n", strerror(errno));
return false;
break;
}
}
sofar += r;
}
return true;
}
bool DDV_ready(int sock){
char tmp;
int preflags = fcntl(sock, F_GETFL, 0);
int postflags = preflags | O_NONBLOCK;
fcntl(sock, F_SETFL, postflags);
int r = recv(sock, &tmp, 1, MSG_PEEK);
fcntl(sock, F_SETFL, preflags);
return (r == 1);
}
bool DDV_read(void * buffer, int todo, int sock){
int sofar = 0;
socketBlocking = false;
while (sofar != todo){
int r = recv(sock, (char*)buffer + sofar, todo-sofar, 0);
if (r <= 0){
switch (errno){
case EWOULDBLOCK: socketBlocking = true; break;
default:
socketError = true;
printf("Could not read! %s\n", strerror(errno));
return false;
break;
}
}
sofar += r;
}
return true;
}
bool DDV_read(void * buffer, int width, int count, int sock){return DDV_read(buffer, width*count, sock);}
bool DDV_write(void * buffer, int width, int count, int sock){return DDV_write(buffer, width*count, sock);}
int DDV_iwrite(void * buffer, int todo, int sock){
int r = send(sock, buffer, todo, 0);
if (r < 0){
switch (errno){
case EWOULDBLOCK: break;
default:
socketError = true;
printf("Could not write! %s\n", strerror(errno));
return false;
break;
}
}
return r;
}
int DDV_iread(void * buffer, int todo, int sock){
int r = recv(sock, buffer, todo, 0);
if (r < 0){
switch (errno){
case EWOULDBLOCK: break;
default:
socketError = true;
printf("Could not read! %s\n", strerror(errno));
return false;
break;
}
}
return r;
}

View file

@ -1,4 +1,5 @@
#include <unistd.h> //for read()
#include <fcntl.h>
struct FLV_Pack {
int len;
@ -8,42 +9,86 @@ struct FLV_Pack {
};//FLV_Pack
char FLVHeader[13];
bool All_Hell_Broke_Loose = false;
//reads full length from a file descriptor
void Magic_Read(char * buf, int len, int file){
int i = 0;
while (i < len) i += read(file, buf, len-i);
}
//reads a FLV header and checks for correctness
//checks FLV Header for correctness
//returns true if everything is alright, false otherwise
bool FLV_Readheader(){
fread(FLVHeader,1,13,stdin);
if (FLVHeader[0] != 'F') return false;
if (FLVHeader[1] != 'L') return false;
if (FLVHeader[2] != 'V') return false;
if (FLVHeader[8] != 0x09) return false;
if (FLVHeader[9] != 0) return false;
if (FLVHeader[10] != 0) return false;
if (FLVHeader[11] != 0) return false;
if (FLVHeader[12] != 0) return false;
bool FLV_Checkheader(char * header){
if (header[0] != 'F') return false;
if (header[1] != 'L') return false;
if (header[2] != 'V') return false;
if (header[8] != 0x09) return false;
if (header[9] != 0) return false;
if (header[10] != 0) return false;
if (header[11] != 0) return false;
if (header[12] != 0) return false;
return true;
}//FLV_Readheader
}//FLV_Checkheader
//returns true if header is an FLV header
bool FLV_Isheader(char * header){
if (header[0] != 'F') return false;
if (header[1] != 'L') return false;
if (header[2] != 'V') return false;
return true;
}//FLV_Isheader
bool ReadUntil(char * buffer, unsigned int count, unsigned int & sofar){
if (sofar >= count){return true;}
int r = 0;
r = fread(buffer + sofar,1,count-sofar,stdin);
if (r < 0){All_Hell_Broke_Loose = true; return false;}
sofar += r;
if (sofar >= count){return true;}
return false;
}
//gets a packet, storing in given FLV_Pack pointer.
//will assign pointer if null
//resizes FLV_Pack data field bigger if data doesn't fit
// (does not auto-shrink for speed!)
void FLV_GetPacket(FLV_Pack *& p){
bool FLV_GetPacket(FLV_Pack *& p){
int preflags = fcntl(fileno(stdin), F_GETFL, 0);
int postflags = preflags | O_NONBLOCK;
fcntl(fileno(stdin), F_SETFL, postflags);
static bool done = true;
static unsigned int sofar = 0;
if (!p){p = (FLV_Pack*)calloc(1, sizeof(FLV_Pack));}
if (p->buf < 15){p->data = (char*)realloc(p->data, 15); p->buf = 15;}
fread(p->data,1,11,stdin);
if (done){
//read a header
if (ReadUntil(p->data, 11, sofar)){
//if its a correct FLV header, throw away and read tag header
if (FLV_Isheader(p->data)){
if (ReadUntil(p->data, 13, sofar)){
if (FLV_Checkheader(p->data)){
sofar = 0;
memcpy(FLVHeader, p->data, 13);
}else{All_Hell_Broke_Loose = true;}
}
}else{
//if a tag header, calculate length and read tag body
p->len = p->data[3] + 15;
p->len += (p->data[2] << 8);
p->len += (p->data[1] << 16);
if (p->buf < p->len){p->data = (char*)realloc(p->data, p->len);p->buf = p->len;}
fread(p->data+11,1,p->len-11,stdin);
done = false;
}
}
}else{
//read tag body
if (ReadUntil(p->data, p->len, sofar)){
//calculate keyframeness, next time read header again, return true
p->isKeyframe = false;
if ((p->data[0] == 0x09) && (((p->data[11] & 0xf0) >> 4) == 1)){p->isKeyframe = true;}
done = true;
sofar = 0;
fcntl(fileno(stdin), F_SETFL, preflags);
return true;
}
}
fcntl(fileno(stdin), F_SETFL, preflags);
return false;
}//FLV_GetPacket

View file

@ -1,30 +1,92 @@
SWBaseSocket::SWBaseError SWBerr;
char * FLVbuffer;
int FLV_len;
int FLVbs = 0;
void FLV_Readheader(SWUnixSocket & ss){
static char header[13];
while (ss.frecv(header, 13, &SWBerr) != 13){
//wait
struct FLV_Pack {
int len;
int buf;
bool isKeyframe;
char * data;
};//FLV_Pack
char FLVHeader[13];
bool All_Hell_Broke_Loose = false;
//checks FLV Header for correctness
//returns true if everything is alright, false otherwise
bool FLV_Checkheader(char * header){
if (header[0] != 'F') return false;
if (header[1] != 'L') return false;
if (header[2] != 'V') return false;
if (header[8] != 0x09) return false;
if (header[9] != 0) return false;
if (header[10] != 0) return false;
if (header[11] != 0) return false;
if (header[12] != 0) return false;
return true;
}//FLV_Checkheader
//returns true if header is an FLV header
bool FLV_Isheader(char * header){
if (header[0] != 'F') return false;
if (header[1] != 'L') return false;
if (header[2] != 'V') return false;
return true;
}//FLV_Isheader
bool ReadUntil(char * buffer, unsigned int count, unsigned int & sofar, int sock){
if (sofar >= count){return true;}
int r = 0;
r = DDV_iread(buffer + sofar,count-sofar,sock);
if (r < 0){All_Hell_Broke_Loose = true; return false;}
sofar += r;
if (sofar >= count){return true;}
return false;
}
}//FLV_Readheader
void FLV_Dump(){FLV_len = 0;}
//gets a packet, storing in given FLV_Pack pointer.
//will assign pointer if null
//resizes FLV_Pack data field bigger if data doesn't fit
// (does not auto-shrink for speed!)
bool FLV_GetPacket(FLV_Pack *& p, int sock){
int preflags = fcntl(sock, F_GETFL, 0);
int postflags = preflags | O_NONBLOCK;
fcntl(sock, F_SETFL, postflags);
static bool done = true;
static unsigned int sofar = 0;
if (!p){p = (FLV_Pack*)calloc(1, sizeof(FLV_Pack));}
if (p->buf < 15){p->data = (char*)realloc(p->data, 15); p->buf = 15;}
bool FLV_GetPacket(SWUnixSocket & ss){
if (FLVbs < 15){FLVbuffer = (char*)realloc(FLVbuffer, 15); FLVbs = 15;}
//if received a whole header, receive a whole packet
//if not, retry header next pass
if (FLV_len == 0){
if (ss.frecv(FLVbuffer, 11, &SWBerr) == 11){
FLV_len = FLVbuffer[3] + 15;
FLV_len += (FLVbuffer[2] << 8);
FLV_len += (FLVbuffer[1] << 16);
if (FLVbs < FLV_len){FLVbuffer = (char*)realloc(FLVbuffer, FLV_len);FLVbs = FLV_len;}
if (done){
//read a header
if (ReadUntil(p->data, 11, sofar, sock)){
//if its a correct FLV header, throw away and read tag header
if (FLV_Isheader(p->data)){
if (ReadUntil(p->data, 13, sofar, sock)){
if (FLV_Checkheader(p->data)){
sofar = 0;
memcpy(FLVHeader, p->data, 13);
}else{All_Hell_Broke_Loose = true;}
}
}else{
if (ss.frecv(FLVbuffer+11, FLV_len-11, &SWBerr) == FLV_len-11){return true;}
//if a tag header, calculate length and read tag body
p->len = p->data[3] + 15;
p->len += (p->data[2] << 8);
p->len += (p->data[1] << 16);
if (p->buf < p->len){p->data = (char*)realloc(p->data, p->len);p->buf = p->len;}
done = false;
}
}
}else{
//read tag body
if (ReadUntil(p->data, p->len, sofar, sock)){
//calculate keyframeness, next time read header again, return true
p->isKeyframe = false;
if ((p->data[0] == 0x09) && (((p->data[11] & 0xf0) >> 4) == 1)){p->isKeyframe = true;}
done = true;
sofar = 0;
fcntl(sock, F_SETFL, preflags);
return true;
}
}
fcntl(sock, F_SETFL, preflags);
return false;
}//FLV_GetPacket