First multithreaded version of the Buffer process.
This commit is contained in:
parent
e3ecdb1e4b
commit
4cd8641e50
5 changed files with 1074 additions and 71 deletions
|
@ -1,4 +1,4 @@
|
|||
SRC = main.cpp ../util/json.cpp ../util/socket.cpp ../util/dtsc.cpp
|
||||
SRC = main.cpp ../util/json.cpp ../util/socket.cpp ../util/dtsc.cpp ../util/tinythread.cpp
|
||||
OBJ = $(SRC:.cpp=.o)
|
||||
OUT = DDV_Buffer
|
||||
INCLUDES =
|
||||
|
@ -8,7 +8,7 @@ CCFLAGS = -Wall -Wextra -funsigned-char $(OPTIMIZE) -DDEBUG=$(DEBUG)
|
|||
CC = $(CROSS)g++
|
||||
LD = $(CROSS)ld
|
||||
AR = $(CROSS)ar
|
||||
LIBS =
|
||||
LIBS = -lpthread
|
||||
.SUFFIXES: .cpp
|
||||
.PHONY: clean default
|
||||
default: $(OUT)
|
||||
|
|
155
Buffer/main.cpp
155
Buffer/main.cpp
|
@ -15,6 +15,7 @@
|
|||
#include "../util/dtsc.h" //DTSC support
|
||||
#include "../util/socket.h" //Socket lib
|
||||
#include "../util/json.h"
|
||||
#include "../util/tinythread.h"
|
||||
|
||||
/// Holds all code unique to the Buffer.
|
||||
namespace Buffer{
|
||||
|
@ -38,7 +39,9 @@ namespace Buffer{
|
|||
}
|
||||
|
||||
DTSC::Stream * Strm = 0;
|
||||
|
||||
std::string waiting_ip = ""; ///< IP address for media push.
|
||||
Socket::Connection ip_input; ///< Connection used for media push.
|
||||
|
||||
/// Converts a stats line to up, down, host, connector and conntime values.
|
||||
class Stats{
|
||||
public:
|
||||
|
@ -81,6 +84,7 @@ namespace Buffer{
|
|||
/// Keeps track of what buffer users are using and the connection status.
|
||||
class user{
|
||||
public:
|
||||
tthread::thread * Thread; ///< Holds the thread dealing with this user.
|
||||
DTSC::Ring * myRing; ///< Ring of the buffer for this user.
|
||||
int MyNum; ///< User ID of this user.
|
||||
std::string MyStr; ///< User ID of this user as a string.
|
||||
|
@ -104,6 +108,7 @@ namespace Buffer{
|
|||
curr_down = 0;
|
||||
currsend = 0;
|
||||
myRing = 0;
|
||||
Thread = 0;
|
||||
std::cout << "User " << MyNum << " connected" << std::endl;
|
||||
}//constructor
|
||||
/// Drops held DTSC::Ring class, if one is held.
|
||||
|
@ -113,8 +118,10 @@ namespace Buffer{
|
|||
/// Disconnects the current user. Doesn't do anything if already disconnected.
|
||||
/// Prints "Disconnected user" to stdout if disconnect took place.
|
||||
void Disconnect(std::string reason) {
|
||||
if (S.connected()) {
|
||||
S.close();
|
||||
if (S.connected()){S.close();}
|
||||
if (Thread != 0){
|
||||
if (Thread->joinable()){Thread->join();}
|
||||
Thread = 0;
|
||||
}
|
||||
Storage["curr"].removeMember(MyStr);
|
||||
Storage["log"][MyStr]["connector"] = lastStats.connector;
|
||||
|
@ -148,7 +155,7 @@ namespace Buffer{
|
|||
std::cout << "Warning: User was send corrupt video data and send to the next keyframe!" << std::endl;
|
||||
Strm->dropRing(myRing);
|
||||
myRing = Strm->getRing();
|
||||
}
|
||||
}
|
||||
currsend = 0;
|
||||
|
||||
//try to complete a send
|
||||
|
@ -162,6 +169,60 @@ namespace Buffer{
|
|||
};
|
||||
int user::UserCount = 0;
|
||||
|
||||
void handleUser(void * v_usr){
|
||||
user * usr = (user*)v_usr;
|
||||
std::cerr << "Thread launched for user " << usr->MyStr << ", socket number " << usr->S.getSocket() << std::endl;
|
||||
|
||||
usr->myRing = Strm->getRing();
|
||||
if (!usr->S.write(Strm->outHeader())){
|
||||
usr->Disconnect("failed to receive the header!");
|
||||
return;
|
||||
}
|
||||
|
||||
while (usr->S.connected()){
|
||||
if (usr->S.canRead()){
|
||||
std::string tmp = "";
|
||||
char charbuf;
|
||||
while ((usr->S.iread(&charbuf, 1) == 1) && charbuf != '\n' ){
|
||||
tmp += charbuf;
|
||||
}
|
||||
if (tmp != ""){
|
||||
if (tmp[0] == 'P'){
|
||||
std::cout << "Push attempt from IP " << tmp.substr(2) << std::endl;
|
||||
if (tmp.substr(2) == waiting_ip){
|
||||
if (!ip_input.connected()){
|
||||
std::cout << "Push accepted!" << std::endl;
|
||||
ip_input = usr->S;
|
||||
usr->S = Socket::Connection(-1);
|
||||
return;
|
||||
}else{
|
||||
usr->Disconnect("Push denied - push already in progress!");
|
||||
}
|
||||
}else{
|
||||
usr->Disconnect("Push denied - invalid IP address!");
|
||||
}
|
||||
}
|
||||
if (tmp[0] == 'S'){
|
||||
Stats tmpStats = Stats(tmp.substr(2));
|
||||
unsigned int secs = tmpStats.conntime - usr->lastStats.conntime;
|
||||
if (secs < 1){secs = 1;}
|
||||
usr->curr_up = (tmpStats.up - usr->lastStats.up) / secs;
|
||||
usr->curr_down = (tmpStats.down - usr->lastStats.down) / secs;
|
||||
usr->lastStats = tmpStats;
|
||||
Storage["curr"][usr->MyStr]["connector"] = tmpStats.connector;
|
||||
Storage["curr"][usr->MyStr]["up"] = tmpStats.up;
|
||||
Storage["curr"][usr->MyStr]["down"] = tmpStats.down;
|
||||
Storage["curr"][usr->MyStr]["conntime"] = tmpStats.conntime;
|
||||
Storage["curr"][usr->MyStr]["host"] = tmpStats.host;
|
||||
Storage["curr"][usr->MyStr]["start"] = (unsigned int) time(0) - tmpStats.conntime;
|
||||
}
|
||||
}
|
||||
}
|
||||
usr->Send();
|
||||
}
|
||||
usr->Disconnect("Closed");
|
||||
}
|
||||
|
||||
/// Starts a loop, waiting for connections to send data to.
|
||||
int Start(int argc, char ** argv) {
|
||||
//first make sure no segpipe signals will kill us
|
||||
|
@ -176,9 +237,7 @@ namespace Buffer{
|
|||
std::cout << "usage: " << argv[0] << " streamName [awaiting_IP]" << std::endl;
|
||||
return 1;
|
||||
}
|
||||
std::string waiting_ip = "";
|
||||
bool ip_waiting = false;
|
||||
Socket::Connection ip_input;
|
||||
if (argc >= 4){
|
||||
waiting_ip += argv[2];
|
||||
ip_waiting = true;
|
||||
|
@ -201,10 +260,9 @@ namespace Buffer{
|
|||
Socket::Connection std_input(fileno(stdin));
|
||||
Socket::Connection StatsSocket = Socket::Connection("/tmp/ddv_statistics", true);
|
||||
|
||||
Storage["log"] = JSON::Value();
|
||||
Storage["curr"] = JSON::Value();
|
||||
Storage["totals"] = JSON::Value();
|
||||
|
||||
Storage["log"].null();
|
||||
Storage["curr"].null();
|
||||
Storage["totals"].null();
|
||||
|
||||
while (!feof(stdin) || ip_waiting){
|
||||
usleep(1000); //sleep for 1 ms, to prevent 100% CPU time
|
||||
|
@ -248,79 +306,40 @@ namespace Buffer{
|
|||
}
|
||||
|
||||
//check for new connections, accept them if there are any
|
||||
incoming = SS.accept(true);
|
||||
//starts a thread for every accepted connection
|
||||
incoming = SS.accept(false);
|
||||
if (incoming.connected()){
|
||||
std::cerr << "New socket: " << incoming.getSocket() << std::endl;
|
||||
users.push_back(incoming);
|
||||
//send the header
|
||||
users.back().myRing = Strm->getRing();
|
||||
if (!users.back().S.write(Strm->outHeader())){
|
||||
/// \todo Do this more nicely?
|
||||
users.back().Disconnect("failed to receive the header!");
|
||||
user * usr_ptr = &(users.back());
|
||||
usr_ptr->Thread = new tthread::thread(handleUser, (void *)usr_ptr);
|
||||
}
|
||||
|
||||
//erase disconnected users
|
||||
if (users.size() > 0){
|
||||
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
|
||||
if (!(*usersIt).S.connected()){users.erase(usersIt); break;}
|
||||
}
|
||||
}
|
||||
|
||||
//go through all users
|
||||
if (users.size() > 0){
|
||||
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
|
||||
//remove disconnected users
|
||||
if (!(*usersIt).S.connected()){
|
||||
(*usersIt).Disconnect("Closed");
|
||||
users.erase(usersIt); break;
|
||||
}else{
|
||||
if ((*usersIt).S.canRead()){
|
||||
std::string tmp = "";
|
||||
char charbuf;
|
||||
while (((*usersIt).S.iread(&charbuf, 1) == 1) && charbuf != '\n' ){
|
||||
tmp += charbuf;
|
||||
}
|
||||
if (tmp != ""){
|
||||
if (tmp[0] == 'P'){
|
||||
std::cout << "Push attempt from IP " << tmp.substr(2) << std::endl;
|
||||
if (tmp.substr(2) == waiting_ip){
|
||||
if (!ip_input.connected()){
|
||||
std::cout << "Push accepted!" << std::endl;
|
||||
ip_input = (*usersIt).S;
|
||||
users.erase(usersIt);
|
||||
break;
|
||||
}else{
|
||||
(*usersIt).Disconnect("Push denied - push already in progress!");
|
||||
}
|
||||
}else{
|
||||
(*usersIt).Disconnect("Push denied - invalid IP address!");
|
||||
}
|
||||
}
|
||||
if (tmp[0] == 'S'){
|
||||
Stats tmpStats = Stats(tmp.substr(2));
|
||||
unsigned int secs = tmpStats.conntime - (*usersIt).lastStats.conntime;
|
||||
if (secs < 1){secs = 1;}
|
||||
(*usersIt).curr_up = (tmpStats.up - (*usersIt).lastStats.up) / secs;
|
||||
(*usersIt).curr_down = (tmpStats.down - (*usersIt).lastStats.down) / secs;
|
||||
(*usersIt).lastStats = tmpStats;
|
||||
Storage["curr"][(*usersIt).MyStr]["connector"] = tmpStats.connector;
|
||||
Storage["curr"][(*usersIt).MyStr]["up"] = tmpStats.up;
|
||||
Storage["curr"][(*usersIt).MyStr]["down"] = tmpStats.down;
|
||||
Storage["curr"][(*usersIt).MyStr]["conntime"] = tmpStats.conntime;
|
||||
Storage["curr"][(*usersIt).MyStr]["host"] = tmpStats.host;
|
||||
Storage["curr"][(*usersIt).MyStr]["start"] = (unsigned int) time(0) - tmpStats.conntime;
|
||||
}
|
||||
}
|
||||
}
|
||||
(*usersIt).Send();
|
||||
}
|
||||
}
|
||||
}
|
||||
}//main loop
|
||||
|
||||
// disconnect listener
|
||||
/// \todo Deal with EOF more nicely - doesn't send the end of the stream to all users!
|
||||
std::cout << "Reached EOF of input" << std::endl;
|
||||
SS.close();
|
||||
|
||||
while (users.size() > 0){
|
||||
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
|
||||
(*usersIt).Disconnect("Shutting down...");
|
||||
if (!(*usersIt).S.connected()){users.erase(usersIt);break;}
|
||||
if ((*usersIt).S.connected()){
|
||||
(*usersIt).Disconnect("Terminating...");
|
||||
}else{
|
||||
users.erase(usersIt);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
delete Strm;
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue