Assorted buffer fixes - stdin input is now considered stable, push (ie: RTMP) input seems to still have a few issues.

This commit is contained in:
Thulinma 2013-02-19 16:14:15 +01:00
parent 29426200f6
commit 14962f88e9
3 changed files with 81 additions and 85 deletions

View file

@ -58,69 +58,73 @@ namespace Buffer {
#endif #endif
usr->myRing = thisStream->getRing(); usr->myRing = thisStream->getRing();
if (thisStream->getHeader().size() > 0){ if (thisStream->getStream()->metadata && thisStream->getHeader().size() > 0){
usr->S.SendNow(thisStream->getHeader()); usr->S.SendNow(thisStream->getHeader());
} }
while (usr->S.connected()){ while (usr->S.connected()){
usleep(5000); //sleep 5ms usleep(5000); //sleep 5ms
usr->Send(); usr->Send();
if (usr->S.spool() && usr->S.Received().size()){ if (usr->S.spool()){
//delete anything that doesn't end with a newline while (usr->S.Received().size()){
if ( !usr->S.Received().get().empty() && *(usr->S.Received().get().rbegin()) != '\n'){ //delete anything that doesn't end with a newline
usr->S.Received().get().clear(); if ( !usr->S.Received().get().empty() && *(usr->S.Received().get().rbegin()) != '\n'){
continue; usr->S.Received().get().clear();
} continue;
usr->S.Received().get().resize(usr->S.Received().get().size() - 1); }
if ( !usr->S.Received().get().empty()){ usr->S.Received().get().resize(usr->S.Received().get().size() - 1);
switch (usr->S.Received().get()[0]){ if ( !usr->S.Received().get().empty()){
case 'P': { //Push switch (usr->S.Received().get()[0]){
std::cout << "Push attempt from IP " << usr->S.Received().get().substr(2) << std::endl; case 'P': { //Push
if (thisStream->checkWaitingIP(usr->S.Received().get().substr(2))){ std::cout << "Push attempt from IP " << usr->S.Received().get().substr(2) << std::endl;
if (thisStream->setInput(usr->S)){ if (thisStream->checkWaitingIP(usr->S.Received().get().substr(2))){
std::cout << "Push accepted!" << std::endl; usr->S.Received().get().clear();
usr->S = Socket::Connection( -1); if (thisStream->setInput(usr->S)){
return; std::cout << "Push accepted!" << std::endl;
usr->S = Socket::Connection( -1);
return;
}else{
usr->Disconnect("Push denied - push already in progress!");
}
}else{ }else{
usr->Disconnect("Push denied - push already in progress!"); usr->Disconnect("Push denied - invalid IP address!");
} }
}else{
usr->Disconnect("Push denied - invalid IP address!");
} }
} break;
break; case 'S': { //Stats
case 'S': { //Stats usr->tmpStats = Stats(usr->S.Received().get().substr(2));
usr->tmpStats = Stats(usr->S.Received().get().substr(2)); unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime;
unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime; if (secs < 1){
if (secs < 1){ secs = 1;
secs = 1; }
usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs;
usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs;
usr->lastStats = usr->tmpStats;
thisStream->saveStats(usr->MyStr, usr->tmpStats);
} }
usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs; break;
usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs; case 's': { //second-seek
usr->lastStats = usr->tmpStats; //ignored for now
thisStream->saveStats(usr->MyStr, usr->tmpStats); }
break;
case 'f': { //frame-seek
//ignored for now
}
break;
case 'p': { //play
//ignored for now
}
break;
case 'o': { //once-play
//ignored for now
}
break;
case 'q': { //quit-playing
//ignored for now
}
break;
} }
break; usr->S.Received().get().clear();
case 's': { //second-seek
//ignored for now
}
break;
case 'f': { //frame-seek
//ignored for now
}
break;
case 'p': { //play
//ignored for now
}
break;
case 'o': { //once-play
//ignored for now
}
break;
case 'q': { //quit-playing
//ignored for now
}
break;
} }
} }
} }
@ -163,7 +167,6 @@ namespace Buffer {
} }
} }
buffer_running = false; buffer_running = false;
SS.close();
} }
/// Loop reading DTSC data from an IP push address. /// Loop reading DTSC data from an IP push address.
@ -175,14 +178,18 @@ namespace Buffer {
while (buffer_running){ while (buffer_running){
if (thisStream->getIPInput().connected()){ if (thisStream->getIPInput().connected()){
if (thisStream->getIPInput().spool()){ if (thisStream->getIPInput().spool()){
thisStream->getWriteLock(); bool packed_parsed = false;
if (thisStream->getStream()->parsePacket(thisStream->getIPInput().Received())){ do {
//thisStream->getStream()->outPacket(0); thisStream->getWriteLock();
thisStream->dropWriteLock(true); if (thisStream->getStream()->parsePacket(thisStream->getIPInput().Received())){
}else{ thisStream->dropWriteLock(true);
thisStream->dropWriteLock(false); packed_parsed = true;
usleep(1000); //1ms wait }else{
} thisStream->dropWriteLock(false);
packed_parsed = false;
usleep(1000); //1ms wait
}
} while(packed_parsed);
}else{ }else{
usleep(1000); //1ms wait usleep(1000); //1ms wait
} }
@ -190,7 +197,6 @@ namespace Buffer {
usleep(1000000); //1s wait usleep(1000000); //1s wait
} }
} }
SS.close();
} }
/// Starts a loop, waiting for connections to send data to. /// Starts a loop, waiting for connections to send data to.

View file

@ -30,18 +30,14 @@ Buffer::Stream::Stream(){
/// Do cleanup on delete. /// Do cleanup on delete.
Buffer::Stream::~Stream(){ Buffer::Stream::~Stream(){
while (users.size() > 0){ tthread::lock_guard<tthread::mutex> guard(stats_mutex);
stats_mutex.lock(); for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ if (( * *usersIt).S.connected()){
if (( * *usersIt).S.connected()){ ( * *usersIt).S.close();
( * *usersIt).S.close(); printf("Closing user %s\n", ( * *usersIt).MyStr.c_str());
printf("Closing user %s\n", ( * *usersIt).MyStr.c_str());
}
} }
stats_mutex.unlock();
moreData.notify_all();
cleanUsers();
} }
moreData.notify_all();
delete Strm; delete Strm;
} }
@ -50,7 +46,7 @@ std::string & Buffer::Stream::getStats(){
static std::string ret; static std::string ret;
long long int now = Util::epoch(); long long int now = Util::epoch();
unsigned int tot_up = 0, tot_down = 0, tot_count = 0; unsigned int tot_up = 0, tot_down = 0, tot_count = 0;
stats_mutex.lock(); tthread::lock_guard<tthread::mutex> guard(stats_mutex);
if (users.size() > 0){ if (users.size() > 0){
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){ for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
tot_down += ( * *usersIt).curr_down; tot_down += ( * *usersIt).curr_down;
@ -72,7 +68,6 @@ std::string & Buffer::Stream::getStats(){
} }
ret = Storage.toString(); ret = Storage.toString();
Storage["log"].null(); Storage["log"].null();
stats_mutex.unlock();
return ret; return ret;
} }
@ -123,19 +118,18 @@ Socket::Connection & Buffer::Stream::getIPInput(){
/// Stores intermediate statistics. /// Stores intermediate statistics.
void Buffer::Stream::saveStats(std::string username, Stats & stats){ void Buffer::Stream::saveStats(std::string username, Stats & stats){
stats_mutex.lock(); tthread::lock_guard<tthread::mutex> guard(stats_mutex);
Storage["curr"][username]["connector"] = stats.connector; Storage["curr"][username]["connector"] = stats.connector;
Storage["curr"][username]["up"] = stats.up; Storage["curr"][username]["up"] = stats.up;
Storage["curr"][username]["down"] = stats.down; Storage["curr"][username]["down"] = stats.down;
Storage["curr"][username]["conntime"] = stats.conntime; Storage["curr"][username]["conntime"] = stats.conntime;
Storage["curr"][username]["host"] = stats.host; Storage["curr"][username]["host"] = stats.host;
Storage["curr"][username]["start"] = Util::epoch() - stats.conntime; Storage["curr"][username]["start"] = Util::epoch() - stats.conntime;
stats_mutex.unlock();
} }
/// Stores final statistics. /// Stores final statistics.
void Buffer::Stream::clearStats(std::string username, Stats & stats, std::string reason){ void Buffer::Stream::clearStats(std::string username, Stats & stats, std::string reason){
stats_mutex.lock(); tthread::lock_guard<tthread::mutex> guard(stats_mutex);
if (Storage["curr"].isMember(username)){ if (Storage["curr"].isMember(username)){
Storage["curr"].removeMember(username); Storage["curr"].removeMember(username);
#if DEBUG >= 4 #if DEBUG >= 4
@ -149,13 +143,12 @@ void Buffer::Stream::clearStats(std::string username, Stats & stats, std::string
Storage["log"][username]["conntime"] = stats.conntime; Storage["log"][username]["conntime"] = stats.conntime;
Storage["log"][username]["host"] = stats.host; Storage["log"][username]["host"] = stats.host;
Storage["log"][username]["start"] = Util::epoch() - stats.conntime; Storage["log"][username]["start"] = Util::epoch() - stats.conntime;
stats_mutex.unlock();
} }
/// Cleans up broken connections /// Cleans up broken connections
void Buffer::Stream::cleanUsers(){ void Buffer::Stream::cleanUsers(){
bool repeat = false; bool repeat = false;
stats_mutex.lock(); tthread::lock_guard<tthread::mutex> guard(stats_mutex);
do{ do{
repeat = false; repeat = false;
if (users.size() > 0){ if (users.size() > 0){
@ -177,7 +170,6 @@ void Buffer::Stream::cleanUsers(){
} }
} }
}while (repeat); }while (repeat);
stats_mutex.unlock();
} }
/// Blocks until writing is safe. /// Blocks until writing is safe.
@ -231,14 +223,12 @@ void Buffer::Stream::setName(std::string n){
/// Add a user to the userlist. /// Add a user to the userlist.
void Buffer::Stream::addUser(user * new_user){ void Buffer::Stream::addUser(user * new_user){
stats_mutex.lock(); tthread::lock_guard<tthread::mutex> guard(stats_mutex);
users.push_back(new_user); users.push_back(new_user);
stats_mutex.unlock();
} }
/// Blocks the thread until new data is available. /// Blocks the thread until new data is available.
void Buffer::Stream::waitForData(){ void Buffer::Stream::waitForData(){
stats_mutex.lock(); tthread::lock_guard<tthread::mutex> guard(stats_mutex);
moreData.wait(stats_mutex); moreData.wait(stats_mutex);
stats_mutex.unlock();
} }

View file

@ -42,7 +42,7 @@ void Buffer::user::Disconnect(std::string reason){
/// Has a side effect of dropping the connection if send will never complete. /// Has a side effect of dropping the connection if send will never complete.
bool Buffer::user::doSend(const char * ptr, int len){ bool Buffer::user::doSend(const char * ptr, int len){
if ( !len){ if ( !len){
return false; return true;
} //do not do empty sends } //do not do empty sends
int r = S.iwrite(ptr + currsend, len - currsend); int r = S.iwrite(ptr + currsend, len - currsend);
if (r <= 0){ if (r <= 0){