Fixes to MistBuffer for live input as well as the input side of the RTMP connector.

This commit is contained in:
Thulinma 2012-11-17 12:14:28 +01:00
parent 0ca46557a5
commit 26d9a6cabf
4 changed files with 27 additions and 21 deletions

View file

@ -37,6 +37,7 @@ namespace Buffer{
Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true); Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
while (buffer_running){ while (buffer_running){
usleep(1000000); //sleep one second usleep(1000000); //sleep one second
Stream::get()->cleanUsers();
if (!StatsSocket.connected()){ if (!StatsSocket.connected()){
StatsSocket = Socket::Connection("/tmp/mist/statistics", true); StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
} }
@ -56,8 +57,9 @@ namespace Buffer{
#endif #endif
usr->myRing = thisStream->getRing(); usr->myRing = thisStream->getRing();
usr->S.Send(thisStream->getHeader()); if (thisStream->getHeader().size() > 0){
usr->S.flush(); usr->S.SendNow(thisStream->getHeader());
}
while (usr->S.connected()){ while (usr->S.connected()){
usleep(5000); //sleep 5ms usleep(5000); //sleep 5ms
@ -114,7 +116,6 @@ namespace Buffer{
} }
} }
usr->Disconnect("Socket closed."); usr->Disconnect("Socket closed.");
thisStream->cleanUsers();
} }
/// Loop reading DTSC data from stdin and processing it at the correct speed. /// Loop reading DTSC data from stdin and processing it at the correct speed.
@ -161,8 +162,8 @@ namespace Buffer{
if (thisStream->getIPInput().connected()){ if (thisStream->getIPInput().connected()){
if (thisStream->getIPInput().spool()){ if (thisStream->getIPInput().spool()){
thisStream->getWriteLock(); thisStream->getWriteLock();
if (thisStream->getStream()->parsePacket(thisStream->getIPInput().Received().get())){ if (thisStream->getStream()->parsePacket(thisStream->getIPInput().Received())){
thisStream->getStream()->outPacket(0); //thisStream->getStream()->outPacket(0);
thisStream->dropWriteLock(true); thisStream->dropWriteLock(true);
}else{ }else{
thisStream->dropWriteLock(false); thisStream->dropWriteLock(false);
@ -223,10 +224,15 @@ namespace Buffer{
// disconnect listener // disconnect listener
buffer_running = false; buffer_running = false;
std::cout << "End of input file - buffer shutting down" << std::endl; std::cout << "Buffer shutting down" << std::endl;
SS.close(); SS.close();
if (StatsThread){StatsThread->join();} if (StatsThread){
StatsThread->join();
delete StatsThread;
}
if (thisStream->getIPInput().connected()){thisStream->getIPInput().close();}
StdinThread->join(); StdinThread->join();
delete StdinThread;
delete thisStream; delete thisStream;
return 0; return 0;
} }

View file

@ -32,10 +32,8 @@ Buffer::Stream::~Stream(){
stats_mutex.lock(); stats_mutex.lock();
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
if ((**usersIt).S.connected()){ if ((**usersIt).S.connected()){
if ((**usersIt).myRing->waiting){ (**usersIt).S.close();
(**usersIt).S.close(); printf("Closing user %s\n", (**usersIt).MyStr.c_str());
printf("Closing user %s\n", (**usersIt).MyStr.c_str());
}
} }
} }
stats_mutex.unlock(); stats_mutex.unlock();
@ -146,7 +144,6 @@ void Buffer::Stream::clearStats(std::string username, Stats & stats, std::string
Storage["log"][username]["host"] = stats.host; Storage["log"][username]["host"] = stats.host;
Storage["log"][username]["start"] = Util::epoch() - stats.conntime; Storage["log"][username]["start"] = Util::epoch() - stats.conntime;
stats_mutex.unlock(); stats_mutex.unlock();
cleanUsers();
} }
/// Cleans up broken connections /// Cleans up broken connections
@ -162,6 +159,14 @@ void Buffer::Stream::cleanUsers(){
users.erase(usersIt); users.erase(usersIt);
repeat = true; repeat = true;
break; break;
}else{
if (!(**usersIt).S.connected()){
if ((**usersIt).Thread->joinable()){
(**usersIt).Thread->join();
delete (**usersIt).Thread;
(**usersIt).Thread = 0;
}
}
} }
} }
} }

View file

@ -31,19 +31,14 @@ Buffer::user::~user(){
/// Disconnects the current user. Doesn't do anything if already disconnected. /// Disconnects the current user. Doesn't do anything if already disconnected.
/// Prints "Disconnected user" to stdout if disconnect took place. /// Prints "Disconnected user" to stdout if disconnect took place.
void Buffer::user::Disconnect(std::string reason) { void Buffer::user::Disconnect(std::string reason) {
Stream::get()->clearStats(MyStr, lastStats, reason);
if (S.connected()){S.close();} if (S.connected()){S.close();}
if (Thread != 0){ Stream::get()->clearStats(MyStr, lastStats, reason);
if (Thread->joinable()){
Thread->join();
}
Thread = 0;
}
}//Disconnect }//Disconnect
/// Tries to send the current buffer, returns true if success, false otherwise. /// Tries to send the current buffer, returns true if success, false otherwise.
/// Has a side effect of dropping the connection if send will never complete. /// Has a side effect of dropping the connection if send will never complete.
bool Buffer::user::doSend(const char * ptr, int len){ bool Buffer::user::doSend(const char * ptr, int len){
if (!len){return false;}//do not do empty sends
int r = S.iwrite(ptr+currsend, len-currsend); int r = S.iwrite(ptr+currsend, len-currsend);
if (r <= 0){ if (r <= 0){
if (errno == EWOULDBLOCK){return false;} if (errno == EWOULDBLOCK){return false;}

View file

@ -291,9 +291,9 @@ void Connector_RTMP::parseChunk(Socket::Buffer & inbuffer){
if (counter > 8){ if (counter > 8){
sending = true; sending = true;
SS.SendNow(meta_out.toNetPacked()); SS.SendNow(meta_out.toNetPacked());
SS.SendNow(prebuffer.str().c_str());//write buffer SS.SendNow(prebuffer.str().c_str(), prebuffer.str().size());//write buffer
prebuffer.str("");//clear buffer prebuffer.str("");//clear buffer
SS.Send(pack_out.toNetPacked()); SS.SendNow(pack_out.toNetPacked());
}else{ }else{
prebuffer << pack_out.toNetPacked(); prebuffer << pack_out.toNetPacked();
} }