Stable multi-user buffer and re-enabled push support. Renamed DDV_->Mist. Closes #25

This commit is contained in:
Thulinma 2012-04-27 21:07:26 +02:00
parent 9ae274b0c1
commit 6c588e51fc
21 changed files with 613 additions and 382 deletions

View file

@ -12,27 +12,14 @@
#include <signal.h>
#include <sstream>
#include <sys/time.h>
#include "../util/dtsc.h" //DTSC support
#include "../util/socket.h" //Socket lib
#include "../util/json.h"
#include "../util/tinythread.h"
#include "stream.h"
/// Holds all code unique to the Buffer.
namespace Buffer{
class user;//forward declaration
JSON::Value Storage; ///< Global storage of data.
DTSC::Stream * Strm = 0;
std::string waiting_ip = ""; ///< IP address for media push.
Socket::Connection ip_input; ///< Connection used for media push.
tthread::mutex stats_mutex; ///< Mutex for stats modifications.
tthread::mutex transfer_mutex; ///< Mutex for data transfers.
tthread::mutex socket_mutex; ///< Mutex for user deletion/work.
bool buffer_running = true; ///< Set to false when shutting down.
std::vector<user> users; ///< All connected users.
std::vector<user>::iterator usersIt; ///< Iterator for all connected users.
std::string name; ///< Name for this buffer.
tthread::condition_variable moreData; ///< Triggered when more data becomes available.
volatile bool buffer_running = true; ///< Set to false when shutting down.
Stream * thisStream = 0;
Socket::Server SS; ///< The server socket.
/// Gets the current system time in milliseconds.
unsigned int getNowMS(){
@ -50,40 +37,18 @@ namespace Buffer{
default: return; break;
}
}
}
#include "stats.cpp"
#include "user.cpp"
namespace Buffer{
void handleStats(void * empty){
if (empty != 0){return;}
Socket::Connection StatsSocket = Socket::Connection("/tmp/ddv_statistics", true);
Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
while (buffer_running){
usleep(1000000); //sleep one second
unsigned int now = time(0);
unsigned int tot_up = 0, tot_down = 0, tot_count = 0;
stats_mutex.lock();
if (users.size() > 0){
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
tot_down += usersIt->curr_down;
tot_up += usersIt->curr_up;
tot_count++;
}
}
Storage["totals"]["down"] = tot_down;
Storage["totals"]["up"] = tot_up;
Storage["totals"]["count"] = tot_count;
Storage["totals"]["now"] = now;
Storage["totals"]["buffer"] = name;
if (!StatsSocket.connected()){
StatsSocket = Socket::Connection("/tmp/ddv_statistics", true);
StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
}
if (StatsSocket.connected()){
StatsSocket.write(Storage.toString()+"\n\n");
Storage["log"].null();
StatsSocket.write(Stream::get()->getStats()+"\n\n");
}
stats_mutex.unlock();
}
}
@ -91,8 +56,8 @@ namespace Buffer{
user * usr = (user*)v_usr;
std::cerr << "Thread launched for user " << usr->MyStr << ", socket number " << usr->S.getSocket() << std::endl;
usr->myRing = Strm->getRing();
if (!usr->S.write(Strm->outHeader())){
usr->myRing = thisStream->getRing();
if (!usr->S.write(thisStream->getHeader())){
usr->Disconnect("failed to receive the header!");
return;
}
@ -108,10 +73,9 @@ namespace Buffer{
if (usr->inbuffer != ""){
if (usr->inbuffer[0] == 'P'){
std::cout << "Push attempt from IP " << usr->inbuffer.substr(2) << std::endl;
if (usr->inbuffer.substr(2) == waiting_ip){
if (!ip_input.connected()){
if (thisStream->checkWaitingIP(usr->inbuffer.substr(2))){
if (thisStream->setInput(usr->S)){
std::cout << "Push accepted!" << std::endl;
ip_input = usr->S;
usr->S = Socket::Connection(-1);
return;
}else{
@ -122,38 +86,23 @@ namespace Buffer{
}
}
if (usr->inbuffer[0] == 'S'){
stats_mutex.lock();
usr->tmpStats = Stats(usr->inbuffer.substr(2));
unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime;
if (secs < 1){secs = 1;}
usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs;
usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs;
usr->lastStats = usr->tmpStats;
Storage["curr"][usr->MyStr]["connector"] = usr->tmpStats.connector;
Storage["curr"][usr->MyStr]["up"] = usr->tmpStats.up;
Storage["curr"][usr->MyStr]["down"] = usr->tmpStats.down;
Storage["curr"][usr->MyStr]["conntime"] = usr->tmpStats.conntime;
Storage["curr"][usr->MyStr]["host"] = usr->tmpStats.host;
Storage["curr"][usr->MyStr]["start"] = (unsigned int) time(0) - usr->tmpStats.conntime;
stats_mutex.unlock();
thisStream->saveStats(usr->MyStr, usr->tmpStats);
}
}
}
usr->Send();
}
stats_mutex.lock();
if (users.size() > 0){
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
if (!(*usersIt).S.connected()){
users.erase(usersIt);
break;
}
}
}
stats_mutex.unlock();
thisStream->cleanUsers();
std::cerr << "User " << usr->MyStr << " disconnected, socket number " << usr->S.getSocket() << std::endl;
}
/// Loop reading DTSC data from stdin and processing it at the correct speed.
void handleStdin(void * empty){
if (empty != 0){return;}
unsigned int lastPacketTime = 0;//time in MS last packet was parsed
@ -167,28 +116,49 @@ namespace Buffer{
while (std::cin.good() && buffer_running){
//slow down packet receiving to real-time
now = getNowMS();
if ((now - lastPacketTime > currPacketTime - prevPacketTime) || (currPacketTime <= prevPacketTime)){
if ((now - lastPacketTime >= currPacketTime - prevPacketTime) || (currPacketTime <= prevPacketTime)){
std::cin.read(charBuffer, 1024*10);
charCount = std::cin.gcount();
inBuffer.append(charBuffer, charCount);
transfer_mutex.lock();
if (Strm->parsePacket(inBuffer)){
Strm->outPacket(0);
thisStream->getWriteLock();
if (thisStream->getStream()->parsePacket(inBuffer)){
thisStream->getStream()->outPacket(0);
lastPacketTime = now;
prevPacketTime = currPacketTime;
currPacketTime = Strm->getTime();
moreData.notify_all();
currPacketTime = thisStream->getStream()->getTime();
}
transfer_mutex.unlock();
thisStream->dropWriteLock();
}else{
if (((currPacketTime - prevPacketTime) - (now - lastPacketTime)) > 1000){
usleep(1000000);
if (((currPacketTime - prevPacketTime) - (now - lastPacketTime)) > 999){
usleep(999000);
}else{
usleep(((currPacketTime - prevPacketTime) - (now - lastPacketTime)) * 1000);
usleep(((currPacketTime - prevPacketTime) - (now - lastPacketTime)) * 999);
}
}
}
buffer_running = false;
SS.close();
}
/// Loop reading DTSC data from an IP push address.
/// No changes to the speed are made.
void handlePushin(void * empty){
if (empty != 0){return;}
std::string inBuffer;
while (buffer_running){
if (thisStream->getIPInput().connected()){
if (thisStream->getIPInput().iread(inBuffer)){
thisStream->getWriteLock();
if (thisStream->getStream()->parsePacket(inBuffer)){
thisStream->getStream()->outPacket(0);
}
thisStream->dropWriteLock();
}
}else{
usleep(1000000);
}
}
SS.close();
}
/// Starts a loop, waiting for connections to send data to.
@ -206,62 +176,49 @@ namespace Buffer{
std::cout << "usage: " << argv[0] << " streamName [awaiting_IP]" << std::endl;
return 1;
}
name = argv[1];
std::string name = argv[1];
bool ip_waiting = false;
std::string waiting_ip;
if (argc >= 4){
waiting_ip += argv[2];
ip_waiting = true;
}
std::string shared_socket = "/tmp/shared_socket_";
shared_socket += name;
Socket::Server SS(shared_socket, false);
Strm = new DTSC::Stream(5);
SS = Socket::makeStream(name);
thisStream = Stream::get();
thisStream->setName(name);
if (ip_waiting){
thisStream->setWaitingIP(waiting_ip);
}
Socket::Connection incoming;
Socket::Connection std_input(fileno(stdin));
Storage["log"].null();
Storage["curr"].null();
Storage["totals"].null();
//tthread::thread StatsThread = tthread::thread(handleStats, 0);
tthread::thread StatsThread = tthread::thread(handleStats, 0);
tthread::thread * StdinThread = 0;
if (!ip_waiting){
StdinThread = new tthread::thread(handleStdin, 0);
}else{
StdinThread = new tthread::thread(handlePushin, 0);
}
while (buffer_running){
while (buffer_running && SS.connected()){
//check for new connections, accept them if there are any
//starts a thread for every accepted connection
incoming = SS.accept(false);
if (incoming.connected()){
stats_mutex.lock();
users.push_back(incoming);
user * usr_ptr = &(users.back());
stats_mutex.unlock();
user * usr_ptr = new user(incoming);
thisStream->addUser(usr_ptr);
usr_ptr->Thread = new tthread::thread(handleUser, (void *)usr_ptr);
}
}//main loop
// disconnect listener
/// \todo Deal with EOF more nicely - doesn't send the end of the stream to all users!
buffer_running = false;
std::cout << "Buffer shutting down" << std::endl;
std::cout << "End of input file - buffer shutting down" << std::endl;
SS.close();
//StatsThread.join();
if (StdinThread){StdinThread->join();}
if (users.size() > 0){
stats_mutex.lock();
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
if ((*usersIt).S.connected()){
(*usersIt).Disconnect("Terminating...");
}
}
stats_mutex.unlock();
}
delete Strm;
StatsThread.join();
StdinThread->join();
delete thisStream;
return 0;
}