Fixed some issues with buffers, fixed child reaping for all connectors

This commit is contained in:
Thulinma 2011-10-07 03:08:07 +02:00
parent 6c2ed0e93c
commit 0f692ff998
2 changed files with 33 additions and 29 deletions

View file

@ -169,14 +169,15 @@ namespace Buffer{
bool gotData = false; bool gotData = false;
while((!feof(stdin) || ip_waiting) && !FLV::Parse_Error){ while((!feof(stdin) || ip_waiting) && !FLV::Parse_Error){
usleep(1000); //sleep for 1 ms, to prevent 100% CPU time
//invalidate the current buffer //invalidate the current buffer
ringbuf[current_buffer]->number = -1; ringbuf[current_buffer]->number = -1;
if ( if (
(!ip_waiting && (!ip_waiting &&
(std_input.canRead()) && ringbuf[current_buffer]->FLV.FileLoader(stdin) (std_input.canRead()) && ringbuf[current_buffer]->FLV.FileLoader(stdin)
) || (ip_waiting && (ip_input.connected()) && ) || (ip_waiting && (ip_input.connected()) &&
ringbuf[current_buffer]->FLV.SockLoader(ip_input) ringbuf[current_buffer]->FLV.SockLoader(ip_input)
) )
){ ){
loopcount++; loopcount++;
packtype = ringbuf[current_buffer]->FLV.data[0]; packtype = ringbuf[current_buffer]->FLV.data[0];
@ -256,32 +257,32 @@ namespace Buffer{
} }
} }
//send all connections what they need, if and when they need it //go through all users
if (users.size() > 0){ if (users.size() > 0){
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
//remove disconnected users
if (!(*usersIt).S.connected()){ if (!(*usersIt).S.connected()){
users.erase(usersIt); break; users.erase(usersIt); break;
}else{ }else{
if (!gotData && ip_waiting){ if ((*usersIt).S.canRead()){
if ((*usersIt).S.canRead()){ std::string tmp = "";
std::string tmp = ""; char charbuf;
char charbuf; while (((*usersIt).S.iread(&charbuf, 1) == 1) && charbuf != '\n' ){
while (((*usersIt).S.iread(&charbuf, 1) == 1) && charbuf != '\n' ){ tmp += charbuf;
tmp += charbuf; }
} if (tmp != ""){
if (tmp != ""){ std::cout << "Push attempt from IP " << tmp << std::endl;
std::cout << "Push attempt from IP " << tmp << std::endl; if (tmp == waiting_ip){
if (tmp == waiting_ip){ if (!ip_input.connected()){
if (!ip_input.connected()){ std::cout << "Push accepted!" << std::endl;
std::cout << "Push accepted!" << std::endl; ip_input = (*usersIt).S;
ip_input = (*usersIt).S; users.erase(usersIt);
users.erase(usersIt); break; break;
}else{
std::cout << "Push denied - push already in progress!" << std::endl;
}
}else{ }else{
std::cout << "Push denied!" << std::endl; (*usersIt).Disconnect("Push denied - push already in progress!");
} }
}else{
(*usersIt).Disconnect("Push denied - invalid IP address!");
} }
} }
} }
@ -311,5 +312,5 @@ namespace Buffer{
/// Entry point for Buffer, simply calls Buffer::Start(). /// Entry point for Buffer, simply calls Buffer::Start().
int main(int argc, char ** argv){ int main(int argc, char ** argv){
Buffer::Start(argc, argv); return Buffer::Start(argc, argv);
}//main }//main

View file

@ -35,13 +35,17 @@ Socket::Server server_socket(-1); ///< Placeholder for the server socket
/// Disconnecting the server_socket will terminate the main listening loop /// Disconnecting the server_socket will terminate the main listening loop
/// and cleanly shut down the process. /// and cleanly shut down the process.
void signal_handler (int signum){ void signal_handler (int signum){
if (!server_socket.connected()) return;
switch (signum){ switch (signum){
case SIGINT: break; case SIGINT: break;
case SIGHUP: break; case SIGHUP: break;
case SIGTERM: break; case SIGTERM: break;
case SIGCHLD:
wait(0);
return;
break;
default: return; break; default: return; break;
} }
if (!server_socket.connected()) return;
server_socket.close(); server_socket.close();
}//signal_handler }//signal_handler
@ -64,6 +68,7 @@ int main(int argc, char ** argv){
sigaction(SIGHUP, &new_action, NULL); sigaction(SIGHUP, &new_action, NULL);
sigaction(SIGTERM, &new_action, NULL); sigaction(SIGTERM, &new_action, NULL);
sigaction(SIGPIPE, &new_action, NULL); sigaction(SIGPIPE, &new_action, NULL);
sigaction(SIGCHLD, &new_action, NULL);
//default values //default values
int listen_port = DEFAULT_PORT; int listen_port = DEFAULT_PORT;
@ -174,9 +179,7 @@ int main(int argc, char ** argv){
} }
} }
int status;
while (server_socket.connected()){ while (server_socket.connected()){
while (waitpid((pid_t)-1, &status, WNOHANG) > 0){}//clean up all child processes
S = server_socket.accept(); S = server_socket.accept();
if (S.connected()){//check if the new connection is valid if (S.connected()){//check if the new connection is valid
pid_t myid = fork(); pid_t myid = fork();