First version of standalone RTMP connector

This commit is contained in:
Thulinma 2010-11-07 22:48:05 +01:00
parent 931bbcced1
commit e04bb3efde
4 changed files with 145 additions and 84 deletions

View file

@ -108,18 +108,18 @@ void SendChunk(chunkpack ch){
} }
} }
if (ch.cs_id <= 63){ if (ch.cs_id <= 63){
tmp = chtype | ch.cs_id; fwrite(&tmp, 1, 1, stdout); tmp = chtype | ch.cs_id; fwrite(&tmp, 1, 1, CONN);
snd_cnt+=1; snd_cnt+=1;
}else{ }else{
if (ch.cs_id <= 255+64){ if (ch.cs_id <= 255+64){
tmp = chtype | 0; fwrite(&tmp, 1, 1, stdout); tmp = chtype | 0; fwrite(&tmp, 1, 1, CONN);
tmp = ch.cs_id - 64; fwrite(&tmp, 1, 1, stdout); tmp = ch.cs_id - 64; fwrite(&tmp, 1, 1, CONN);
snd_cnt+=2; snd_cnt+=2;
}else{ }else{
tmp = chtype | 1; fwrite(&tmp, 1, 1, stdout); tmp = chtype | 1; fwrite(&tmp, 1, 1, CONN);
tmpi = ch.cs_id - 64; tmpi = ch.cs_id - 64;
tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout); tmp = tmpi % 256; fwrite(&tmp, 1, 1, CONN);
tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout); tmp = tmpi / 256; fwrite(&tmp, 1, 1, CONN);
snd_cnt+=3; snd_cnt+=3;
} }
} }
@ -129,67 +129,67 @@ void SendChunk(chunkpack ch){
if (chtype == 0x00){ if (chtype == 0x00){
tmpi = ch.timestamp; tmpi = ch.timestamp;
if (tmpi >= 0x00ffffff){ntime = tmpi; tmpi = 0x00ffffff;} if (tmpi >= 0x00ffffff){ntime = tmpi; tmpi = 0x00ffffff;}
tmp = tmpi / (256*256); fwrite(&tmp, 1, 1, stdout); tmp = tmpi / (256*256); fwrite(&tmp, 1, 1, CONN);
tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout); tmp = tmpi / 256; fwrite(&tmp, 1, 1, CONN);
tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout); tmp = tmpi % 256; fwrite(&tmp, 1, 1, CONN);
snd_cnt+=3; snd_cnt+=3;
}else{ }else{
tmpi = ch.timestamp - prev.timestamp; tmpi = ch.timestamp - prev.timestamp;
if (tmpi >= 0x00ffffff){ntime = tmpi; tmpi = 0x00ffffff;} if (tmpi >= 0x00ffffff){ntime = tmpi; tmpi = 0x00ffffff;}
tmp = tmpi / (256*256); fwrite(&tmp, 1, 1, stdout); tmp = tmpi / (256*256); fwrite(&tmp, 1, 1, CONN);
tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout); tmp = tmpi / 256; fwrite(&tmp, 1, 1, CONN);
tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout); tmp = tmpi % 256; fwrite(&tmp, 1, 1, CONN);
snd_cnt+=3; snd_cnt+=3;
} }
if (chtype != 0x80){ if (chtype != 0x80){
//len //len
tmpi = ch.len; tmpi = ch.len;
tmp = tmpi / (256*256); fwrite(&tmp, 1, 1, stdout); tmp = tmpi / (256*256); fwrite(&tmp, 1, 1, CONN);
tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout); tmp = tmpi / 256; fwrite(&tmp, 1, 1, CONN);
tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout); tmp = tmpi % 256; fwrite(&tmp, 1, 1, CONN);
snd_cnt+=3; snd_cnt+=3;
//msg type id //msg type id
tmp = ch.msg_type_id; fwrite(&tmp, 1, 1, stdout); tmp = ch.msg_type_id; fwrite(&tmp, 1, 1, CONN);
snd_cnt+=1; snd_cnt+=1;
if (chtype != 0x40){ if (chtype != 0x40){
//msg stream id //msg stream id
tmp = ch.msg_stream_id % 256; fwrite(&tmp, 1, 1, stdout); tmp = ch.msg_stream_id % 256; fwrite(&tmp, 1, 1, CONN);
tmp = ch.msg_stream_id / 256; fwrite(&tmp, 1, 1, stdout); tmp = ch.msg_stream_id / 256; fwrite(&tmp, 1, 1, CONN);
tmp = ch.msg_stream_id / (256*256); fwrite(&tmp, 1, 1, stdout); tmp = ch.msg_stream_id / (256*256); fwrite(&tmp, 1, 1, CONN);
tmp = ch.msg_stream_id / (256*256*256); fwrite(&tmp, 1, 1, stdout); tmp = ch.msg_stream_id / (256*256*256); fwrite(&tmp, 1, 1, CONN);
snd_cnt+=4; snd_cnt+=4;
} }
} }
} }
//support for 0x00ffffff timestamps //support for 0x00ffffff timestamps
if (ntime){ if (ntime){
tmp = ntime / (256*256*256); fwrite(&tmp, 1, 1, stdout); tmp = ntime / (256*256*256); fwrite(&tmp, 1, 1, CONN);
tmp = ntime / (256*256); fwrite(&tmp, 1, 1, stdout); tmp = ntime / (256*256); fwrite(&tmp, 1, 1, CONN);
tmp = ntime / 256; fwrite(&tmp, 1, 1, stdout); tmp = ntime / 256; fwrite(&tmp, 1, 1, CONN);
tmp = ntime % 256; fwrite(&tmp, 1, 1, stdout); tmp = ntime % 256; fwrite(&tmp, 1, 1, CONN);
snd_cnt+=4; snd_cnt+=4;
} }
ch.len_left = 0; ch.len_left = 0;
while (ch.len_left < ch.len){ while (ch.len_left < ch.len){
tmpi = ch.len - ch.len_left; tmpi = ch.len - ch.len_left;
if (tmpi > chunk_snd_max){tmpi = chunk_snd_max;} if (tmpi > chunk_snd_max){tmpi = chunk_snd_max;}
fwrite((ch.data + ch.len_left), 1, tmpi, stdout); fwrite((ch.data + ch.len_left), 1, tmpi, CONN);
snd_cnt+=tmpi; snd_cnt+=tmpi;
ch.len_left += tmpi; ch.len_left += tmpi;
if (ch.len_left < ch.len){ if (ch.len_left < ch.len){
if (ch.cs_id <= 63){ if (ch.cs_id <= 63){
tmp = 0xC0 + ch.cs_id; fwrite(&tmp, 1, 1, stdout); tmp = 0xC0 + ch.cs_id; fwrite(&tmp, 1, 1, CONN);
snd_cnt+=1; snd_cnt+=1;
}else{ }else{
if (ch.cs_id <= 255+64){ if (ch.cs_id <= 255+64){
tmp = 0xC0; fwrite(&tmp, 1, 1, stdout); tmp = 0xC0; fwrite(&tmp, 1, 1, CONN);
tmp = ch.cs_id - 64; fwrite(&tmp, 1, 1, stdout); tmp = ch.cs_id - 64; fwrite(&tmp, 1, 1, CONN);
snd_cnt+=2; snd_cnt+=2;
}else{ }else{
tmp = 0xC1; fwrite(&tmp, 1, 1, stdout); tmp = 0xC1; fwrite(&tmp, 1, 1, CONN);
tmpi = ch.cs_id - 64; tmpi = ch.cs_id - 64;
tmp = tmpi % 256; fwrite(&tmp, 1, 1, stdout); tmp = tmpi % 256; fwrite(&tmp, 1, 1, CONN);
tmp = tmpi / 256; fwrite(&tmp, 1, 1, stdout); tmp = tmpi / 256; fwrite(&tmp, 1, 1, CONN);
snd_cnt+=4; snd_cnt+=4;
} }
} }
@ -310,19 +310,19 @@ struct chunkpack getChunk(){
gettimeofday(&lastrec, 0); gettimeofday(&lastrec, 0);
struct chunkpack ret; struct chunkpack ret;
unsigned char temp; unsigned char temp;
fread(&(ret.chunktype), 1, 1, stdin); fread(&(ret.chunktype), 1, 1, CONN);
rec_cnt++; rec_cnt++;
//read the chunkstream ID properly //read the chunkstream ID properly
switch (ret.chunktype & 0x3F){ switch (ret.chunktype & 0x3F){
case 0: case 0:
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
rec_cnt++; rec_cnt++;
ret.cs_id = temp + 64; ret.cs_id = temp + 64;
break; break;
case 1: case 1:
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.cs_id = temp + 64; ret.cs_id = temp + 64;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.cs_id += temp * 256; ret.cs_id += temp * 256;
rec_cnt+=2; rec_cnt+=2;
break; break;
@ -334,57 +334,57 @@ struct chunkpack getChunk(){
//process the rest of the header, for each chunk type //process the rest of the header, for each chunk type
switch (ret.chunktype & 0xC0){ switch (ret.chunktype & 0xC0){
case 0x00: case 0x00:
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.timestamp = temp*256*256; ret.timestamp = temp*256*256;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.timestamp += temp*256; ret.timestamp += temp*256;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.timestamp += temp; ret.timestamp += temp;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.len = temp*256*256; ret.len = temp*256*256;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.len += temp*256; ret.len += temp*256;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.len += temp; ret.len += temp;
ret.len_left = 0; ret.len_left = 0;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.msg_type_id = temp; ret.msg_type_id = temp;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.msg_stream_id = temp; ret.msg_stream_id = temp;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.msg_stream_id += temp*256; ret.msg_stream_id += temp*256;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.msg_stream_id += temp*256*256; ret.msg_stream_id += temp*256*256;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.msg_stream_id += temp*256*256*256; ret.msg_stream_id += temp*256*256*256;
rec_cnt+=11; rec_cnt+=11;
break; break;
case 0x40: case 0x40:
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.timestamp = temp*256*256; ret.timestamp = temp*256*256;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.timestamp += temp*256; ret.timestamp += temp*256;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.timestamp += temp; ret.timestamp += temp;
ret.timestamp += prev.timestamp; ret.timestamp += prev.timestamp;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.len = temp*256*256; ret.len = temp*256*256;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.len += temp*256; ret.len += temp*256;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.len += temp; ret.len += temp;
ret.len_left = 0; ret.len_left = 0;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.msg_type_id = temp; ret.msg_type_id = temp;
ret.msg_stream_id = prev.msg_stream_id; ret.msg_stream_id = prev.msg_stream_id;
rec_cnt+=7; rec_cnt+=7;
break; break;
case 0x80: case 0x80:
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.timestamp = temp*256*256; ret.timestamp = temp*256*256;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.timestamp += temp*256; ret.timestamp += temp*256;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.timestamp += temp; ret.timestamp += temp;
ret.timestamp += prev.timestamp; ret.timestamp += prev.timestamp;
ret.len = prev.len; ret.len = prev.len;
@ -414,20 +414,20 @@ struct chunkpack getChunk(){
} }
//read extended timestamp, if neccesary //read extended timestamp, if neccesary
if (ret.timestamp == 0x00ffffff){ if (ret.timestamp == 0x00ffffff){
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.timestamp = temp*256*256*256; ret.timestamp = temp*256*256*256;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.timestamp += temp*256*256; ret.timestamp += temp*256*256;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.timestamp += temp*256; ret.timestamp += temp*256;
fread(&temp, 1, 1, stdin); fread(&temp, 1, 1, CONN);
ret.timestamp += temp; ret.timestamp += temp;
rec_cnt+=4; rec_cnt+=4;
} }
//read data if length > 0, and allocate it //read data if length > 0, and allocate it
if (ret.real_len > 0){ if (ret.real_len > 0){
ret.data = (unsigned char*)malloc(ret.real_len); ret.data = (unsigned char*)malloc(ret.real_len);
fread(ret.data, 1, ret.real_len, stdin); fread(ret.data, 1, ret.real_len, CONN);
rec_cnt+=ret.real_len; rec_cnt+=ret.real_len;
}else{ }else{
ret.data = 0; ret.data = 0;
@ -493,7 +493,7 @@ chunkpack getWholeChunk(){
free(ret);//cleanup returned chunk free(ret);//cleanup returned chunk
return gwc_complete; return gwc_complete;
} }
if (feof(stdin) != 0){break;} if (feof(CONN) != 0){break;}
counter++; counter++;
} }
gwc_complete.msg_type_id = 0; gwc_complete.msg_type_id = 0;

View file

@ -14,11 +14,11 @@ bool doHandshake(){
Handshake Client; Handshake Client;
Handshake Server; Handshake Server;
/** Read C0 **/ /** Read C0 **/
fread(&(Version), 1, 1, stdin); fread(&(Version), 1, 1, CONN);
/** Read C1 **/ /** Read C1 **/
fread(Client.Time, 1, 4, stdin); fread(Client.Time, 1, 4, CONN);
fread(Client.Zero, 1, 4, stdin); fread(Client.Zero, 1, 4, CONN);
fread(Client.Random, 1, 1528, stdin); fread(Client.Random, 1, 1528, CONN);
rec_cnt+=1537; rec_cnt+=1537;
/** Build S1 Packet **/ /** Build S1 Packet **/
Server.Time[0] = 0; Server.Time[1] = 0; Server.Time[2] = 0; Server.Time[3] = 0; Server.Time[0] = 0; Server.Time[1] = 0; Server.Time[2] = 0; Server.Time[3] = 0;
@ -27,25 +27,25 @@ bool doHandshake(){
Server.Random[i] = versionstring[i%13]; Server.Random[i] = versionstring[i%13];
} }
/** Send S0 **/ /** Send S0 **/
fwrite(&(Version), 1, 1, stdout); fwrite(&(Version), 1, 1, CONN);
/** Send S1 **/ /** Send S1 **/
fwrite(Server.Time, 1, 4, stdout); fwrite(Server.Time, 1, 4, CONN);
fwrite(Server.Zero, 1, 4, stdout); fwrite(Server.Zero, 1, 4, CONN);
fwrite(Server.Random, 1, 1528, stdout); fwrite(Server.Random, 1, 1528, CONN);
/** Flush output, just for certainty **/ /** Flush output, just for certainty **/
fflush(stdout); fflush(CONN);
snd_cnt+=1537; snd_cnt+=1537;
/** Send S2 **/ /** Send S2 **/
fwrite(Client.Time, 1, 4, stdout); fwrite(Client.Time, 1, 4, CONN);
fwrite(Client.Time, 1, 4, stdout); fwrite(Client.Time, 1, 4, CONN);
fwrite(Client.Random, 1, 1528, stdout); fwrite(Client.Random, 1, 1528, CONN);
snd_cnt+=1536; snd_cnt+=1536;
/** Flush, necessary in order to work **/ /** Flush, necessary in order to work **/
fflush(stdout); fflush(CONN);
/** Read and discard C2 **/ /** Read and discard C2 **/
fread(Client.Time, 1, 4, stdin); fread(Client.Time, 1, 4, CONN);
fread(Client.Zero, 1, 4, stdin); fread(Client.Zero, 1, 4, CONN);
fread(Client.Random, 1, 1528, stdin); fread(Client.Random, 1, 1528, CONN);
rec_cnt+=1536; rec_cnt+=1536;
return true; return true;
}//doHandshake }//doHandshake
@ -57,10 +57,10 @@ bool doHandshake(){
bool doHandshake(){ bool doHandshake(){
char Version; char Version;
/** Read C0 **/ /** Read C0 **/
fread(&Version, 1, 1, stdin); fread(&Version, 1, 1, CONN);
uint8_t Client[1536]; uint8_t Client[1536];
uint8_t Server[3072]; uint8_t Server[3072];
fread(&Client, 1, 1536, stdin); fread(&Client, 1, 1536, CONN);
rec_cnt+=1537; rec_cnt+=1537;
/** Build S1 Packet **/ /** Build S1 Packet **/
@ -123,13 +123,13 @@ bool doHandshake(){
delete[] pLastHash; delete[] pLastHash;
//***** DONE BUILDING THE RESPONSE ***// //***** DONE BUILDING THE RESPONSE ***//
/** Send response **/ /** Send response **/
fwrite(&Version, 1, 1, stdout); fwrite(&Version, 1, 1, CONN);
fwrite(&Server, 1, 3072, stdout); fwrite(&Server, 1, 3072, CONN);
snd_cnt+=3073; snd_cnt+=3073;
/** Flush, necessary in order to work **/ /** Flush, necessary in order to work **/
fflush(stdout); fflush(CONN);
/** Read and discard C2 **/ /** Read and discard C2 **/
fread(Client, 1, 1536, stdin); fread(Client, 1, 1536, CONN);
rec_cnt+=1536; rec_cnt+=1536;
return true; return true;
} }

View file

@ -3,6 +3,7 @@
#include <cstdlib> #include <cstdlib>
#include <cstdio> #include <cstdio>
#include <cmath> #include <cmath>
#include <unistd.h>
//for connection to server //for connection to server
#include "../sockets/SocketW.h" #include "../sockets/SocketW.h"
@ -11,11 +12,30 @@ bool inited = false;
bool stopparsing = false; bool stopparsing = false;
timeval lastrec; timeval lastrec;
int CONN = 0;
#include "parsechunks.cpp" //chunkstream parsing #include "parsechunks.cpp" //chunkstream parsing
#include "handshake.cpp" //handshaking #include "handshake.cpp" //handshaking
#include "../util/flv_sock.cpp" //FLV parsing with SocketW #include "../util/flv_sock.cpp" //FLV parsing with SocketW
#include "../util/ddv_socket.cpp" //DDVTech Socket wrapper
int main(){ int main(){
int server_socket = DDV_Listen(1935);
while (server_socket > 0){
CONN = DDV_Accept(server_socket);
pid_t myid = fork();
if (myid == 0){
break;
}else{
printf("Spawned new process %i for incoming client\n", (int)myid);
}
}
if (server_socket <= 0){
return 0;
}
unsigned int ts; unsigned int ts;
unsigned int fts = 0; unsigned int fts = 0;
unsigned int ftst; unsigned int ftst;

41
util/ddv_socket.cpp Normal file
View file

@ -0,0 +1,41 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
int DDV_Listen(int port){
int s = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);//port 8888
inet_pton(AF_INET, "0.0.0.0", &addr.sin_addr);//listen on all interfaces
ret = bind(sock, (sockaddr*)&addr, sizeof(addr));//bind to all interfaces, chosen port
if (ret == 0){
ret = listen(sock, 100);//start listening, backlog of 100 allowed
if (ret == 0){
return s;
}else{
printf("Listen failed! Error: %s\n", strerror(errno));
close(s);
return 0;
}
}else{
printf("Binding failed! Error: %s\n", strerror(errno));
close(s);
return 0;
}
}
int DDV_Accept(int sock){
int r = accept(sock, 0, 0);
if (r != -1){
return fdopen(r, "r+");
}else{
return -1;
}
}