Progressive now works with continuously updated metadata
This commit is contained in:
parent
6644132762
commit
ddf4983836
3 changed files with 111 additions and 85 deletions
145
src/buffer.cpp
145
src/buffer.cpp
|
@ -64,71 +64,76 @@ namespace Buffer {
|
||||||
|
|
||||||
while (usr->S.connected()){
|
while (usr->S.connected()){
|
||||||
usleep(5000); //sleep 5ms
|
usleep(5000); //sleep 5ms
|
||||||
if( !usr->Send()){
|
if( !usr->myRing->playCount || !usr->Send()){
|
||||||
if (usr->S.spool()){
|
if (usr->S.spool()){
|
||||||
while (usr->S.Received().size()){
|
while (usr->S.Received().size()){
|
||||||
//delete anything that doesn't end with a newline
|
//delete anything that doesn't end with a newline
|
||||||
if ( !usr->S.Received().get().empty() && *(usr->S.Received().get().rbegin()) != '\n'){
|
if ( !usr->S.Received().get().empty() && *(usr->S.Received().get().rbegin()) != '\n'){
|
||||||
usr->S.Received().get().clear();
|
usr->S.Received().get().clear();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
usr->S.Received().get().resize(usr->S.Received().get().size() - 1);
|
usr->S.Received().get().resize(usr->S.Received().get().size() - 1);
|
||||||
if ( !usr->S.Received().get().empty()){
|
if ( !usr->S.Received().get().empty()){
|
||||||
switch (usr->S.Received().get()[0]){
|
switch (usr->S.Received().get()[0]){
|
||||||
case 'P': { //Push
|
case 'P': { //Push
|
||||||
std::cout << "Push attempt from IP " << usr->S.Received().get().substr(2) << std::endl;
|
std::cout << "Push attempt from IP " << usr->S.Received().get().substr(2) << std::endl;
|
||||||
if (thisStream->checkWaitingIP(usr->S.Received().get().substr(2))){
|
if (thisStream->checkWaitingIP(usr->S.Received().get().substr(2))){
|
||||||
usr->S.Received().get().clear();
|
usr->S.Received().get().clear();
|
||||||
if (thisStream->setInput(usr->S)){
|
if (thisStream->setInput(usr->S)){
|
||||||
std::cout << "Push accepted!" << std::endl;
|
std::cout << "Push accepted!" << std::endl;
|
||||||
usr->S = Socket::Connection( -1);
|
usr->S = Socket::Connection( -1);
|
||||||
return;
|
return;
|
||||||
}else{
|
}else{
|
||||||
usr->Disconnect("Push denied - push already in progress!");
|
usr->Disconnect("Push denied - push already in progress!");
|
||||||
}
|
}
|
||||||
}else{
|
}else{
|
||||||
usr->Disconnect("Push denied - invalid IP address!");
|
usr->Disconnect("Push denied - invalid IP address!");
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case 'S': { //Stats
|
case 'S': { //Stats
|
||||||
usr->tmpStats = Stats(usr->S.Received().get().substr(2));
|
usr->tmpStats = Stats(usr->S.Received().get().substr(2));
|
||||||
unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime;
|
unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime;
|
||||||
if (secs < 1){
|
if (secs < 1){
|
||||||
secs = 1;
|
secs = 1;
|
||||||
}
|
}
|
||||||
usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs;
|
usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs;
|
||||||
usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs;
|
usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs;
|
||||||
usr->lastStats = usr->tmpStats;
|
usr->lastStats = usr->tmpStats;
|
||||||
thisStream->saveStats(usr->MyStr, usr->tmpStats);
|
thisStream->saveStats(usr->MyStr, usr->tmpStats);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case 's': { //second-seek
|
case 's': { //second-seek
|
||||||
unsigned int ms = JSON::Value(usr->S.Received().get().substr(2)).asInt();
|
unsigned int ms = JSON::Value(usr->S.Received().get().substr(2)).asInt();
|
||||||
usr->myRing.waiting = false;
|
usr->myRing->waiting = false;
|
||||||
usr->myRing.starved = false;
|
usr->myRing->starved = false;
|
||||||
usr->myRing.b = thisStream->getStream()->msSeek(ms);
|
usr->myRing->b = thisStream->getStream()->msSeek(ms);
|
||||||
break;
|
if (usr->myRing->playCount > 0 ) {
|
||||||
}
|
usr->myRing->playCount = 0;
|
||||||
case 'f': { //frame-seek
|
}
|
||||||
//ignored for now
|
break;
|
||||||
break;
|
}
|
||||||
}
|
case 'f': { //frame-seek
|
||||||
case 'p': { //play
|
//ignored for now
|
||||||
//ignored for now
|
break;
|
||||||
break;
|
}
|
||||||
}
|
case 'p': { //play
|
||||||
case 'o': { //once-play
|
usr->myRing->playCount = -1;
|
||||||
//ignored for now
|
break;
|
||||||
break;
|
}
|
||||||
}
|
case 'o': { //once-play
|
||||||
case 'q': { //quit-playing
|
if (usr->myRing->playCount >= 0 ) {
|
||||||
//ignored for now
|
usr->myRing->playCount++;
|
||||||
break;
|
}
|
||||||
}
|
break;
|
||||||
|
}
|
||||||
|
case 'q': { //quit-playing
|
||||||
|
usr->myRing->playCount = 0;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
usr->S.Received().get().clear();
|
||||||
}
|
}
|
||||||
usr->S.Received().get().clear();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -152,30 +157,22 @@ namespace Buffer {
|
||||||
//slow down packet receiving to real-time
|
//slow down packet receiving to real-time
|
||||||
now = getNowMS();
|
now = getNowMS();
|
||||||
if ((now - timeDiff >= lastPacket) || (lastPacket - (now - timeDiff) > 15000)){
|
if ((now - timeDiff >= lastPacket) || (lastPacket - (now - timeDiff) > 15000)){
|
||||||
fprintf( stderr, "Obtaining write lock... " );
|
|
||||||
thisStream->getWriteLock();
|
thisStream->getWriteLock();
|
||||||
fprintf( stderr, "Done.\n" );
|
|
||||||
if (thisStream->getStream()->parsePacket(inBuffer)){
|
if (thisStream->getStream()->parsePacket(inBuffer)){
|
||||||
fprintf( stderr, "Receiving a packet... " );
|
|
||||||
thisStream->getStream()->outPacket(0);
|
thisStream->getStream()->outPacket(0);
|
||||||
lastPacket = thisStream->getStream()->getTime();
|
lastPacket = thisStream->getStream()->getTime();
|
||||||
if ((now - timeDiff - lastPacket) > 15000 || (now - timeDiff - lastPacket < -15000)){
|
if ((now - timeDiff - lastPacket) > 15000 || (now - timeDiff - lastPacket < -15000)){
|
||||||
timeDiff = now - lastPacket;
|
timeDiff = now - lastPacket;
|
||||||
}
|
}
|
||||||
thisStream->dropWriteLock(true);
|
thisStream->dropWriteLock(true);
|
||||||
fprintf( stderr, "Done.\n" );
|
|
||||||
}else{
|
}else{
|
||||||
fprintf( stderr, "Not receiving a packet... " );
|
|
||||||
thisStream->dropWriteLock(false);
|
thisStream->dropWriteLock(false);
|
||||||
std::cin.read(charBuffer, 1024 * 10);
|
std::cin.read(charBuffer, 1024 * 10);
|
||||||
charCount = std::cin.gcount();
|
charCount = std::cin.gcount();
|
||||||
inBuffer.append(charBuffer, charCount);
|
inBuffer.append(charBuffer, charCount);
|
||||||
fprintf( stderr, "Done.\n" );
|
|
||||||
}
|
}
|
||||||
}else{
|
}else{
|
||||||
fprintf( stderr, "Sleeping: %d...", std::min(14999LL, lastPacket - (now - timeDiff)) * 1000 );
|
|
||||||
usleep(std::min(14999LL, lastPacket - (now - timeDiff)) * 1000);
|
usleep(std::min(14999LL, lastPacket - (now - timeDiff)) * 1000);
|
||||||
fprintf( stderr, "Done.\n" );
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
buffer_running = false;
|
buffer_running = false;
|
||||||
|
@ -222,6 +219,8 @@ namespace Buffer {
|
||||||
"{\"arg_num\":2, \"arg\":\"string\", \"default\":\"\", \"help\":\"IP address to expect incoming data from. This will completely disable reading from standard input if used.\"}"));
|
"{\"arg_num\":2, \"arg\":\"string\", \"default\":\"\", \"help\":\"IP address to expect incoming data from. This will completely disable reading from standard input if used.\"}"));
|
||||||
conf.addOption("reportstats",
|
conf.addOption("reportstats",
|
||||||
JSON::fromString("{\"default\":0, \"help\":\"Report stats to a controller process.\", \"short\":\"s\", \"long\":\"reportstats\"}"));
|
JSON::fromString("{\"default\":0, \"help\":\"Report stats to a controller process.\", \"short\":\"s\", \"long\":\"reportstats\"}"));
|
||||||
|
conf.addOption("time",
|
||||||
|
JSON::fromString("{\"default\":0, \"arg\": \"integer\", \"help\":\"Buffer a specied amount of time in ms.\", \"short\":\"t\", \"long\":\"time\"}"));
|
||||||
conf.parseArgs(argc, argv);
|
conf.parseArgs(argc, argv);
|
||||||
|
|
||||||
std::string name = conf.getString("stream_name");
|
std::string name = conf.getString("stream_name");
|
||||||
|
@ -234,6 +233,7 @@ namespace Buffer {
|
||||||
conf.activate();
|
conf.activate();
|
||||||
thisStream = Stream::get();
|
thisStream = Stream::get();
|
||||||
thisStream->setName(name);
|
thisStream->setName(name);
|
||||||
|
thisStream->getStream()->setBufferTime(conf.getInteger("time"));
|
||||||
Socket::Connection incoming;
|
Socket::Connection incoming;
|
||||||
Socket::Connection std_input(fileno(stdin));
|
Socket::Connection std_input(fileno(stdin));
|
||||||
|
|
||||||
|
@ -251,6 +251,7 @@ namespace Buffer {
|
||||||
}
|
}
|
||||||
|
|
||||||
while (buffer_running && SS.connected() && conf.is_active){
|
while (buffer_running && SS.connected() && conf.is_active){
|
||||||
|
fprintf( stderr, "Still running\n" );
|
||||||
//check for new connections, accept them if there are any
|
//check for new connections, accept them if there are any
|
||||||
//starts a thread for every accepted connection
|
//starts a thread for every accepted connection
|
||||||
incoming = SS.accept(true);
|
incoming = SS.accept(true);
|
||||||
|
|
|
@ -66,11 +66,25 @@ bool Buffer::user::Send(){
|
||||||
} //cancel if not connected
|
} //cancel if not connected
|
||||||
if (myRing->waiting){
|
if (myRing->waiting){
|
||||||
Stream::get()->waitForData();
|
Stream::get()->waitForData();
|
||||||
if( myRing->updated ) {
|
if( !myRing->waiting ) {
|
||||||
Stream::get()->getReadLock();
|
if (Stream::get()->getStream()->getPacket(myRing->b).isMember("keyframe") && myRing->playCount > 0){
|
||||||
S.SendNow( Stream::get()->getStream()->metadata.toNetPacked() );
|
myRing->playCount --;
|
||||||
Stream::get()->dropReadLock();
|
if (!myRing->playCount){
|
||||||
myRing->updated = false;
|
fprintf( stderr, "Sending Pausemark\n" );
|
||||||
|
JSON::Value pausemark;
|
||||||
|
pausemark["datatype"] = "pause_marker";
|
||||||
|
pausemark["time"] = Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt();
|
||||||
|
pausemark.toPacked();
|
||||||
|
S.SendNow(pausemark.toNetPacked());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (myRing->updated){
|
||||||
|
fprintf( stderr, "Sent new metadata\n" );
|
||||||
|
Stream::get()->getReadLock();
|
||||||
|
S.SendNow( Stream::get()->getStream()->metadata.toNetPacked() );
|
||||||
|
Stream::get()->dropReadLock();
|
||||||
|
myRing->updated = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
} //still waiting for next buffer?
|
} //still waiting for next buffer?
|
||||||
|
@ -92,16 +106,28 @@ bool Buffer::user::Send(){
|
||||||
} //no next buffer? go in waiting mode.
|
} //no next buffer? go in waiting mode.
|
||||||
myRing->b--;
|
myRing->b--;
|
||||||
if( myRing->updated ) {
|
if( myRing->updated ) {
|
||||||
|
fprintf( stderr, "Sent new metadata\n" );
|
||||||
Stream::get()->getReadLock();
|
Stream::get()->getReadLock();
|
||||||
S.SendNow( Stream::get()->getStream()->metadata.toNetPacked() );
|
S.SendNow( Stream::get()->getStream()->metadata.toNetPacked() );
|
||||||
Stream::get()->dropReadLock();
|
Stream::get()->dropReadLock();
|
||||||
myRing->updated = false;
|
myRing->updated = false;
|
||||||
}
|
}
|
||||||
Stream::get()->dropReadLock();
|
Stream::get()->dropReadLock();
|
||||||
return true;
|
if (Stream::get()->getStream()->getPacket(myRing->b).isMember("keyframe") && myRing->playCount > 0){
|
||||||
} //completed a send
|
myRing->playCount --;
|
||||||
|
if (!myRing->playCount){
|
||||||
|
fprintf( stderr, "Sending Pausemark\n" );
|
||||||
|
JSON::Value pausemark;
|
||||||
|
pausemark["datatype"] = "pause_marker";
|
||||||
|
pausemark["time"] = Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt();
|
||||||
|
pausemark.toPacked();
|
||||||
|
S.SendNow(pausemark.toNetPacked());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}//completed a send
|
||||||
Stream::get()->dropReadLock();
|
Stream::get()->dropReadLock();
|
||||||
return false;
|
return true;
|
||||||
} //send
|
} //send
|
||||||
|
|
||||||
/// Default constructor - should not be in use.
|
/// Default constructor - should not be in use.
|
||||||
|
|
|
@ -131,11 +131,10 @@ namespace Connector_HTTP {
|
||||||
"<deliveryType>streaming</deliveryType>\n"
|
"<deliveryType>streaming</deliveryType>\n"
|
||||||
"<bootstrapInfo profile=\"named\" id=\"bootstrap1\">" + Base64::encode(GenerateBootstrap(MovieId, metadata, 1, 0, 0))
|
"<bootstrapInfo profile=\"named\" id=\"bootstrap1\">" + Base64::encode(GenerateBootstrap(MovieId, metadata, 1, 0, 0))
|
||||||
+ "</bootstrapInfo>\n"
|
+ "</bootstrapInfo>\n"
|
||||||
"<media streamId=\"1\" bootstrapInfoId=\"bootstrap1\" url=\"" + MovieId
|
"<media streamId=\"1\" bootstrapInfoId=\"bootstrap1\" url=\"" + MovieId + "/\">\n"
|
||||||
+ "/\">\n"
|
"<metadata>AgAKb25NZXRhRGF0YQgAAAAAAAl0cmFja2luZm8KAAAAAgMACXRpbWVzY2FsZQBA+GoAAAAAAAAGbGVuZ3RoAEGMcHoQAAAAAAhsYW5ndWFnZQIAA2VuZwARc2FtcGxlZGVzY3JpcHRpb24KAAAAAQMACnNhbXBsZXR5cGUCAARhdmMxAAAJAAAJAwAJdGltZXNjYWxlAEDncAAAAAAAAAZsZW5ndGgAQXtNvTAAAAAACGxhbmd1YWdlAgADZW5nABFzYW1wbGVkZXNjcmlwdGlvbgoAAAABAwAKc2FtcGxldHlwZQIABG1wNGEAAAkAAAkADWF1ZGlvY2hhbm5lbHMAQAAAAAAAAAAAD2F1ZGlvc2FtcGxlcmF0ZQBA53AAAAAAAAAOdmlkZW9mcmFtZXJhdGUAQDf/gi5SciUABmFhY2FvdABAAAAAAAAAAAAIYXZjbGV2ZWwAQD8AAAAAAAAACmF2Y3Byb2ZpbGUAQFNAAAAAAAAADGF1ZGlvY29kZWNpZAIABG1wNGEADHZpZGVvY29kZWNpZAIABGF2YzEABXdpZHRoAECQ4AAAAAAAAAZoZWlnaHQAQIMAAAAAAAAACmZyYW1lV2lkdGgAQJDgAAAAAAAAC2ZyYW1lSGVpZ2h0AECDAAAAAAAAAAxkaXNwbGF5V2lkdGgAQJDgAAAAAAAADWRpc3BsYXlIZWlnaHQAQIMAAAAAAAAADG1vb3Zwb3NpdGlvbgBBmxq2uAAAAAAIZHVyYXRpb24AQIKjqW3oyhIAAAk=</metadata>\n"
|
||||||
"<metadata>AgAKb25NZXRhRGF0YQgAAAAAAAl0cmFja2luZm8KAAAAAgMACXRpbWVzY2FsZQBA+GoAAAAAAAAGbGVuZ3RoAEGMcHoQAAAAAAhsYW5ndWFnZQIAA2VuZwARc2FtcGxlZGVzY3JpcHRpb24KAAAAAQMACnNhbXBsZXR5cGUCAARhdmMxAAAJAAAJAwAJdGltZXNjYWxlAEDncAAAAAAAAAZsZW5ndGgAQXtNvTAAAAAACGxhbmd1YWdlAgADZW5nABFzYW1wbGVkZXNjcmlwdGlvbgoAAAABAwAKc2FtcGxldHlwZQIABG1wNGEAAAkAAAkADWF1ZGlvY2hhbm5lbHMAQAAAAAAAAAAAD2F1ZGlvc2FtcGxlcmF0ZQBA53AAAAAAAAAOdmlkZW9mcmFtZXJhdGUAQDf/gi5SciUABmFhY2FvdABAAAAAAAAAAAAIYXZjbGV2ZWwAQD8AAAAAAAAACmF2Y3Byb2ZpbGUAQFNAAAAAAAAADGF1ZGlvY29kZWNpZAIABG1wNGEADHZpZGVvY29kZWNpZAIABGF2YzEABXdpZHRoAECQ4AAAAAAAAAZoZWlnaHQAQIMAAAAAAAAACmZyYW1lV2lkdGgAQJDgAAAAAAAAC2ZyYW1lSGVpZ2h0AECDAAAAAAAAAAxkaXNwbGF5V2lkdGgAQJDgAAAAAAAADWRpc3BsYXlIZWlnaHQAQIMAAAAAAAAADG1vb3Zwb3NpdGlvbgBBmxq2uAAAAAAIZHVyYXRpb24AQIKjqW3oyhIAAAk=</metadata>\n"
|
"</media>\n"
|
||||||
"</media>\n"
|
"</manifest>\n";
|
||||||
"</manifest>\n";
|
|
||||||
}else{
|
}else{
|
||||||
Result = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
|
Result = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
|
||||||
"<manifest xmlns=\"http://ns.adobe.com/f4m/1.0\">\n"
|
"<manifest xmlns=\"http://ns.adobe.com/f4m/1.0\">\n"
|
||||||
|
|
Loading…
Add table
Reference in a new issue