diff --git a/src/buffer.cpp b/src/buffer.cpp index 0004644e..6d75a2d0 100644 --- a/src/buffer.cpp +++ b/src/buffer.cpp @@ -64,71 +64,76 @@ namespace Buffer { while (usr->S.connected()){ usleep(5000); //sleep 5ms - if( !usr->Send()){ - if (usr->S.spool()){ - while (usr->S.Received().size()){ - //delete anything that doesn't end with a newline - if ( !usr->S.Received().get().empty() && *(usr->S.Received().get().rbegin()) != '\n'){ - usr->S.Received().get().clear(); - continue; - } - usr->S.Received().get().resize(usr->S.Received().get().size() - 1); - if ( !usr->S.Received().get().empty()){ - switch (usr->S.Received().get()[0]){ - case 'P': { //Push - std::cout << "Push attempt from IP " << usr->S.Received().get().substr(2) << std::endl; - if (thisStream->checkWaitingIP(usr->S.Received().get().substr(2))){ - usr->S.Received().get().clear(); - if (thisStream->setInput(usr->S)){ - std::cout << "Push accepted!" << std::endl; - usr->S = Socket::Connection( -1); - return; - }else{ - usr->Disconnect("Push denied - push already in progress!"); - } - }else{ - usr->Disconnect("Push denied - invalid IP address!"); - } - break; - } - case 'S': { //Stats - usr->tmpStats = Stats(usr->S.Received().get().substr(2)); - unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime; - if (secs < 1){ - secs = 1; - } - usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs; - usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs; - usr->lastStats = usr->tmpStats; - thisStream->saveStats(usr->MyStr, usr->tmpStats); - break; - } - case 's': { //second-seek - unsigned int ms = JSON::Value(usr->S.Received().get().substr(2)).asInt(); - usr->myRing.waiting = false; - usr->myRing.starved = false; - usr->myRing.b = thisStream->getStream()->msSeek(ms); - break; - } - case 'f': { //frame-seek - //ignored for now - break; - } - case 'p': { //play - //ignored for now - break; - } - case 'o': { //once-play - //ignored for now - break; - } - case 'q': { //quit-playing - //ignored for now - break; - } + if( !usr->myRing->playCount || !usr->Send()){ + if (usr->S.spool()){ + while (usr->S.Received().size()){ + //delete anything that doesn't end with a newline + if ( !usr->S.Received().get().empty() && *(usr->S.Received().get().rbegin()) != '\n'){ + usr->S.Received().get().clear(); + continue; + } + usr->S.Received().get().resize(usr->S.Received().get().size() - 1); + if ( !usr->S.Received().get().empty()){ + switch (usr->S.Received().get()[0]){ + case 'P': { //Push + std::cout << "Push attempt from IP " << usr->S.Received().get().substr(2) << std::endl; + if (thisStream->checkWaitingIP(usr->S.Received().get().substr(2))){ + usr->S.Received().get().clear(); + if (thisStream->setInput(usr->S)){ + std::cout << "Push accepted!" << std::endl; + usr->S = Socket::Connection( -1); + return; + }else{ + usr->Disconnect("Push denied - push already in progress!"); + } + }else{ + usr->Disconnect("Push denied - invalid IP address!"); + } + break; + } + case 'S': { //Stats + usr->tmpStats = Stats(usr->S.Received().get().substr(2)); + unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime; + if (secs < 1){ + secs = 1; + } + usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs; + usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs; + usr->lastStats = usr->tmpStats; + thisStream->saveStats(usr->MyStr, usr->tmpStats); + break; + } + case 's': { //second-seek + unsigned int ms = JSON::Value(usr->S.Received().get().substr(2)).asInt(); + usr->myRing->waiting = false; + usr->myRing->starved = false; + usr->myRing->b = thisStream->getStream()->msSeek(ms); + if (usr->myRing->playCount > 0 ) { + usr->myRing->playCount = 0; + } + break; + } + case 'f': { //frame-seek + //ignored for now + break; + } + case 'p': { //play + usr->myRing->playCount = -1; + break; + } + case 'o': { //once-play + if (usr->myRing->playCount >= 0 ) { + usr->myRing->playCount++; + } + break; + } + case 'q': { //quit-playing + usr->myRing->playCount = 0; + break; + } + } + usr->S.Received().get().clear(); } - usr->S.Received().get().clear(); - } } } } @@ -152,30 +157,22 @@ namespace Buffer { //slow down packet receiving to real-time now = getNowMS(); if ((now - timeDiff >= lastPacket) || (lastPacket - (now - timeDiff) > 15000)){ - fprintf( stderr, "Obtaining write lock... " ); thisStream->getWriteLock(); - fprintf( stderr, "Done.\n" ); if (thisStream->getStream()->parsePacket(inBuffer)){ - fprintf( stderr, "Receiving a packet... " ); thisStream->getStream()->outPacket(0); lastPacket = thisStream->getStream()->getTime(); if ((now - timeDiff - lastPacket) > 15000 || (now - timeDiff - lastPacket < -15000)){ timeDiff = now - lastPacket; } thisStream->dropWriteLock(true); - fprintf( stderr, "Done.\n" ); }else{ - fprintf( stderr, "Not receiving a packet... " ); thisStream->dropWriteLock(false); std::cin.read(charBuffer, 1024 * 10); charCount = std::cin.gcount(); inBuffer.append(charBuffer, charCount); - fprintf( stderr, "Done.\n" ); } }else{ - fprintf( stderr, "Sleeping: %d...", std::min(14999LL, lastPacket - (now - timeDiff)) * 1000 ); usleep(std::min(14999LL, lastPacket - (now - timeDiff)) * 1000); - fprintf( stderr, "Done.\n" ); } } buffer_running = false; @@ -222,6 +219,8 @@ namespace Buffer { "{\"arg_num\":2, \"arg\":\"string\", \"default\":\"\", \"help\":\"IP address to expect incoming data from. This will completely disable reading from standard input if used.\"}")); conf.addOption("reportstats", JSON::fromString("{\"default\":0, \"help\":\"Report stats to a controller process.\", \"short\":\"s\", \"long\":\"reportstats\"}")); + conf.addOption("time", + JSON::fromString("{\"default\":0, \"arg\": \"integer\", \"help\":\"Buffer a specied amount of time in ms.\", \"short\":\"t\", \"long\":\"time\"}")); conf.parseArgs(argc, argv); std::string name = conf.getString("stream_name"); @@ -234,6 +233,7 @@ namespace Buffer { conf.activate(); thisStream = Stream::get(); thisStream->setName(name); + thisStream->getStream()->setBufferTime(conf.getInteger("time")); Socket::Connection incoming; Socket::Connection std_input(fileno(stdin)); @@ -251,6 +251,7 @@ namespace Buffer { } while (buffer_running && SS.connected() && conf.is_active){ + fprintf( stderr, "Still running\n" ); //check for new connections, accept them if there are any //starts a thread for every accepted connection incoming = SS.accept(true); diff --git a/src/buffer_user.cpp b/src/buffer_user.cpp index 4d20dc5d..b58ecfdb 100644 --- a/src/buffer_user.cpp +++ b/src/buffer_user.cpp @@ -66,11 +66,25 @@ bool Buffer::user::Send(){ } //cancel if not connected if (myRing->waiting){ Stream::get()->waitForData(); - if( myRing->updated ) { - Stream::get()->getReadLock(); - S.SendNow( Stream::get()->getStream()->metadata.toNetPacked() ); - Stream::get()->dropReadLock(); - myRing->updated = false; + if( !myRing->waiting ) { + if (Stream::get()->getStream()->getPacket(myRing->b).isMember("keyframe") && myRing->playCount > 0){ + myRing->playCount --; + if (!myRing->playCount){ + fprintf( stderr, "Sending Pausemark\n" ); + JSON::Value pausemark; + pausemark["datatype"] = "pause_marker"; + pausemark["time"] = Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt(); + pausemark.toPacked(); + S.SendNow(pausemark.toNetPacked()); + } + } + if (myRing->updated){ + fprintf( stderr, "Sent new metadata\n" ); + Stream::get()->getReadLock(); + S.SendNow( Stream::get()->getStream()->metadata.toNetPacked() ); + Stream::get()->dropReadLock(); + myRing->updated = false; + } } return false; } //still waiting for next buffer? @@ -92,16 +106,28 @@ bool Buffer::user::Send(){ } //no next buffer? go in waiting mode. myRing->b--; if( myRing->updated ) { + fprintf( stderr, "Sent new metadata\n" ); Stream::get()->getReadLock(); S.SendNow( Stream::get()->getStream()->metadata.toNetPacked() ); Stream::get()->dropReadLock(); myRing->updated = false; } Stream::get()->dropReadLock(); - return true; - } //completed a send + if (Stream::get()->getStream()->getPacket(myRing->b).isMember("keyframe") && myRing->playCount > 0){ + myRing->playCount --; + if (!myRing->playCount){ + fprintf( stderr, "Sending Pausemark\n" ); + JSON::Value pausemark; + pausemark["datatype"] = "pause_marker"; + pausemark["time"] = Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt(); + pausemark.toPacked(); + S.SendNow(pausemark.toNetPacked()); + } + } + return false; + }//completed a send Stream::get()->dropReadLock(); - return false; + return true; } //send /// Default constructor - should not be in use. diff --git a/src/conn_http_dynamic.cpp b/src/conn_http_dynamic.cpp index dd339add..7d3ea274 100644 --- a/src/conn_http_dynamic.cpp +++ b/src/conn_http_dynamic.cpp @@ -131,11 +131,10 @@ namespace Connector_HTTP { "streaming\n" "" + Base64::encode(GenerateBootstrap(MovieId, metadata, 1, 0, 0)) + "\n" - "\n" - "AgAKb25NZXRhRGF0YQgAAAAAAAl0cmFja2luZm8KAAAAAgMACXRpbWVzY2FsZQBA+GoAAAAAAAAGbGVuZ3RoAEGMcHoQAAAAAAhsYW5ndWFnZQIAA2VuZwARc2FtcGxlZGVzY3JpcHRpb24KAAAAAQMACnNhbXBsZXR5cGUCAARhdmMxAAAJAAAJAwAJdGltZXNjYWxlAEDncAAAAAAAAAZsZW5ndGgAQXtNvTAAAAAACGxhbmd1YWdlAgADZW5nABFzYW1wbGVkZXNjcmlwdGlvbgoAAAABAwAKc2FtcGxldHlwZQIABG1wNGEAAAkAAAkADWF1ZGlvY2hhbm5lbHMAQAAAAAAAAAAAD2F1ZGlvc2FtcGxlcmF0ZQBA53AAAAAAAAAOdmlkZW9mcmFtZXJhdGUAQDf/gi5SciUABmFhY2FvdABAAAAAAAAAAAAIYXZjbGV2ZWwAQD8AAAAAAAAACmF2Y3Byb2ZpbGUAQFNAAAAAAAAADGF1ZGlvY29kZWNpZAIABG1wNGEADHZpZGVvY29kZWNpZAIABGF2YzEABXdpZHRoAECQ4AAAAAAAAAZoZWlnaHQAQIMAAAAAAAAACmZyYW1lV2lkdGgAQJDgAAAAAAAAC2ZyYW1lSGVpZ2h0AECDAAAAAAAAAAxkaXNwbGF5V2lkdGgAQJDgAAAAAAAADWRpc3BsYXlIZWlnaHQAQIMAAAAAAAAADG1vb3Zwb3NpdGlvbgBBmxq2uAAAAAAIZHVyYXRpb24AQIKjqW3oyhIAAAk=\n" - "\n" - "\n"; + "\n" + "AgAKb25NZXRhRGF0YQgAAAAAAAl0cmFja2luZm8KAAAAAgMACXRpbWVzY2FsZQBA+GoAAAAAAAAGbGVuZ3RoAEGMcHoQAAAAAAhsYW5ndWFnZQIAA2VuZwARc2FtcGxlZGVzY3JpcHRpb24KAAAAAQMACnNhbXBsZXR5cGUCAARhdmMxAAAJAAAJAwAJdGltZXNjYWxlAEDncAAAAAAAAAZsZW5ndGgAQXtNvTAAAAAACGxhbmd1YWdlAgADZW5nABFzYW1wbGVkZXNjcmlwdGlvbgoAAAABAwAKc2FtcGxldHlwZQIABG1wNGEAAAkAAAkADWF1ZGlvY2hhbm5lbHMAQAAAAAAAAAAAD2F1ZGlvc2FtcGxlcmF0ZQBA53AAAAAAAAAOdmlkZW9mcmFtZXJhdGUAQDf/gi5SciUABmFhY2FvdABAAAAAAAAAAAAIYXZjbGV2ZWwAQD8AAAAAAAAACmF2Y3Byb2ZpbGUAQFNAAAAAAAAADGF1ZGlvY29kZWNpZAIABG1wNGEADHZpZGVvY29kZWNpZAIABGF2YzEABXdpZHRoAECQ4AAAAAAAAAZoZWlnaHQAQIMAAAAAAAAACmZyYW1lV2lkdGgAQJDgAAAAAAAAC2ZyYW1lSGVpZ2h0AECDAAAAAAAAAAxkaXNwbGF5V2lkdGgAQJDgAAAAAAAADWRpc3BsYXlIZWlnaHQAQIMAAAAAAAAADG1vb3Zwb3NpdGlvbgBBmxq2uAAAAAAIZHVyYXRpb24AQIKjqW3oyhIAAAk=\n" + "\n" + "\n"; }else{ Result = "\n" "\n"