Buffer stabilized

This commit is contained in:
Thulinma 2011-09-13 20:37:16 +02:00
parent 1b7ab627b5
commit 46bfeefb03
2 changed files with 20 additions and 19 deletions

View file

@ -93,7 +93,7 @@ namespace Buffer{
curr_up = 0; curr_up = 0;
curr_down = 0; curr_down = 0;
currsend = 0; currsend = 0;
myRing = Strm->getRing(); myRing = 0;
std::cout << "User " << MyNum << " connected" << std::endl; std::cout << "User " << MyNum << " connected" << std::endl;
}//constructor }//constructor
/// Drops held DTSC::Ring class, if one is held. /// Drops held DTSC::Ring class, if one is held.
@ -129,20 +129,23 @@ namespace Buffer{
}//doSend }//doSend
/// Try to send data to this user. Disconnects if any problems occur. /// Try to send data to this user. Disconnects if any problems occur.
void Send(){ void Send(){
if (!myRing){return;}//no ring!
if (!S.connected()){return;}//cancel if not connected if (!S.connected()){return;}//cancel if not connected
if (myRing->waiting){return;}//still waiting for next buffer? if (myRing->waiting){return;}//still waiting for next buffer?
if (myRing->starved){
//if corrupt data, warn and get new DTSC::Ring
std::cout << "Warning: User was send corrupt video data and send to the next keyframe!" << std::endl;
Strm->dropRing(myRing);
myRing = Strm->getRing();
}
currsend = 0;
//try to complete a send //try to complete a send
if (doSend(Strm->outPacket(myRing->b).c_str(), Strm->outPacket(myRing->b).length())){ if (doSend(Strm->outPacket(myRing->b).c_str(), Strm->outPacket(myRing->b).length())){
//switch to next buffer //switch to next buffer
if (myRing->b <= 0){myRing->waiting = true; return;}//no next buffer? go in waiting mode. if (myRing->b <= 0){myRing->waiting = true; return;}//no next buffer? go in waiting mode.
myRing->b--; myRing->b--;
if (myRing->starved){
//if corrupt data, warn and get new DTSC::Ring
std::cout << "Warning: User was send corrupt video data and send to the next keyframe!" << std::endl;
Strm->dropRing(myRing);
myRing = Strm->getRing();
}
currsend = 0; currsend = 0;
}//completed a send }//completed a send
}//send }//send
@ -159,8 +162,8 @@ namespace Buffer{
sigaction (SIGPIPE, &new_action, NULL); sigaction (SIGPIPE, &new_action, NULL);
//then check and parse the commandline //then check and parse the commandline
if (argc < 3) { if (argc < 2) {
std::cout << "usage: " << argv[0] << " streamname [awaiting_IP]" << std::endl; std::cout << "usage: " << argv[0] << " streamName [awaiting_IP]" << std::endl;
return 1; return 1;
} }
std::string waiting_ip = ""; std::string waiting_ip = "";
@ -229,6 +232,7 @@ namespace Buffer{
if (incoming.connected()){ if (incoming.connected()){
users.push_back(incoming); users.push_back(incoming);
//send the header //send the header
users.back().myRing = Strm->getRing();
if (!users.back().S.write(Strm->outHeader())){ if (!users.back().S.write(Strm->outHeader())){
/// \todo Do this more nicely? /// \todo Do this more nicely?
users.back().Disconnect("failed to receive the header!"); users.back().Disconnect("failed to receive the header!");
@ -297,6 +301,7 @@ namespace Buffer{
if (!(*usersIt).S.connected()){users.erase(usersIt);break;} if (!(*usersIt).S.connected()){users.erase(usersIt);break;}
} }
} }
delete Strm;
return 0; return 0;
} }

View file

@ -117,7 +117,7 @@ void DTSC::Stream::advanceRings(){
for (sit = rings.begin(); sit != rings.end(); sit++){ for (sit = rings.begin(); sit != rings.end(); sit++){
(*sit)->b++; (*sit)->b++;
if ((*sit)->waiting){(*sit)->waiting = false; (*sit)->b = 0;} if ((*sit)->waiting){(*sit)->waiting = false; (*sit)->b = 0;}
if ((*sit)->b >= buffers.size()){(*sit)->starved = true;} if ((*sit)->starved || ((*sit)->b >= buffers.size())){(*sit)->starved = true; (*sit)->b = 0;}
} }
for (dit = keyframes.begin(); dit != keyframes.end(); dit++){ for (dit = keyframes.begin(); dit != keyframes.end(); dit++){
dit->b++; dit->b++;
@ -375,7 +375,6 @@ std::string DTSC::DTMI::Pack(bool netpack){
/// \param name Indice name for any new object created. /// \param name Indice name for any new object created.
/// \returns A single DTSC::DTMI, parsed from the raw data. /// \returns A single DTSC::DTMI, parsed from the raw data.
DTSC::DTMI DTSC::parseOneDTMI(const unsigned char *& data, unsigned int &len, unsigned int &i, std::string name){ DTSC::DTMI DTSC::parseOneDTMI(const unsigned char *& data, unsigned int &len, unsigned int &i, std::string name){
std::string tmpstr;
unsigned int tmpi = 0; unsigned int tmpi = 0;
unsigned char tmpdbl[8]; unsigned char tmpdbl[8];
#if DEBUG >= 10 #if DEBUG >= 10
@ -394,20 +393,18 @@ DTSC::DTMI DTSC::parseOneDTMI(const unsigned char *& data, unsigned int &len, un
i+=9;//skip 8(an uint64_t)+1 forwards i+=9;//skip 8(an uint64_t)+1 forwards
return DTSC::DTMI(name, *(uint64_t*)tmpdbl, DTMI_INT); return DTSC::DTMI(name, *(uint64_t*)tmpdbl, DTMI_INT);
break; break;
case DTMI_STRING: case DTMI_STRING:{
tmpi = data[i+1]*256*256*256+data[i+2]*256*256+data[i+3]*256+data[i+4];//set tmpi to UTF-8-long length tmpi = data[i+1]*256*256*256+data[i+2]*256*256+data[i+3]*256+data[i+4];//set tmpi to UTF-8-long length
tmpstr.clear();//clean tmpstr, just to be sure std::string tmpstr = std::string((const char *)data+i+5, (size_t)tmpi);//set the string data
tmpstr.append((const char *)data+i+5, (size_t)tmpi);//add the string data
i += tmpi + 5;//skip length+size+1 forwards i += tmpi + 5;//skip length+size+1 forwards
return DTSC::DTMI(name, tmpstr, DTMI_STRING); return DTSC::DTMI(name, tmpstr, DTMI_STRING);
break; } break;
case DTMI_ROOT:{ case DTMI_ROOT:{
++i; ++i;
DTSC::DTMI ret(name, DTMI_ROOT); DTSC::DTMI ret(name, DTMI_ROOT);
while (data[i] + data[i+1] != 0){//while not encountering 0x0000 (we assume 0x0000EE) while (data[i] + data[i+1] != 0){//while not encountering 0x0000 (we assume 0x0000EE)
tmpi = data[i]*256+data[i+1];//set tmpi to the UTF-8 length tmpi = data[i]*256+data[i+1];//set tmpi to the UTF-8 length
tmpstr.clear();//clean tmpstr, just to be sure std::string tmpstr = std::string((const char *)data+i+2, (size_t)tmpi);//set the string data
tmpstr.append((const char*)data+i+2, (size_t)tmpi);//add the string data
i += tmpi + 2;//skip length+size forwards i += tmpi + 2;//skip length+size forwards
ret.addContent(parseOneDTMI(data, len, i, tmpstr));//add content, recursively parsed, updating i, setting indice to tmpstr ret.addContent(parseOneDTMI(data, len, i, tmpstr));//add content, recursively parsed, updating i, setting indice to tmpstr
} }
@ -419,8 +416,7 @@ DTSC::DTMI DTSC::parseOneDTMI(const unsigned char *& data, unsigned int &len, un
DTSC::DTMI ret(name, DTMI_OBJECT); DTSC::DTMI ret(name, DTMI_OBJECT);
while (data[i] + data[i+1] != 0){//while not encountering 0x0000 (we assume 0x0000EE) while (data[i] + data[i+1] != 0){//while not encountering 0x0000 (we assume 0x0000EE)
tmpi = data[i]*256+data[i+1];//set tmpi to the UTF-8 length tmpi = data[i]*256+data[i+1];//set tmpi to the UTF-8 length
tmpstr.clear();//clean tmpstr, just to be sure std::string tmpstr = std::string((const char *)data+i+2, (size_t)tmpi);//set the string data
tmpstr.append((const char*)data+i+2, (size_t)tmpi);//add the string data
i += tmpi + 2;//skip length+size forwards i += tmpi + 2;//skip length+size forwards
ret.addContent(parseOneDTMI(data, len, i, tmpstr));//add content, recursively parsed, updating i, setting indice to tmpstr ret.addContent(parseOneDTMI(data, len, i, tmpstr));//add content, recursively parsed, updating i, setting indice to tmpstr
} }