From 5165aae7e377960e85049cd441e94e790939861f Mon Sep 17 00:00:00 2001 From: Thulinma Date: Fri, 31 Aug 2012 12:35:51 +0200 Subject: [PATCH] Update to buffer/player <-> connector protocol, now more in-line with established behaviour. Seeking support in HTTP dynamic and progressive. --- src/Makefile.am | 2 +- src/buffer.cpp | 67 ++++--- src/conn_http.cpp | 10 +- src/conn_http_dynamic.cpp | 3 +- src/conn_http_progressive.cpp | 7 +- src/conn_rtmp.cpp | 2 +- src/player.cpp | 320 ++++++++++------------------------ src/player.h | 28 --- 8 files changed, 146 insertions(+), 293 deletions(-) delete mode 100644 src/player.h diff --git a/src/Makefile.am b/src/Makefile.am index a09808dd..a93ff751 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -18,7 +18,7 @@ MistConnHTTP_SOURCES=conn_http.cpp tinythread.cpp tinythread.h ../VERSION ./embe MistConnHTTP_LDADD=$(MIST_LIBS) -lpthread MistConnHTTPProgressive_SOURCES=conn_http_progressive.cpp ../VERSION MistConnHTTPDynamic_SOURCES=conn_http_dynamic.cpp ../VERSION -MistPlayer_SOURCES=player.h player.cpp +MistPlayer_SOURCES=player.cpp MistPlayer_LDADD=$(MIST_LIBS) lspSOURCES=$(lspdir)/jquery.js $(lspdir)/placeholder.js $(lspdir)/md5.js $(lspdir)/main.js $(lspdir)/functions.js diff --git a/src/buffer.cpp b/src/buffer.cpp index fecab00c..f651840a 100644 --- a/src/buffer.cpp +++ b/src/buffer.cpp @@ -63,28 +63,45 @@ namespace Buffer{ std::string cmd = usr->S.Received().substr(0, usr->S.Received().find('\n')); usr->S.Received().erase(0, usr->S.Received().find('\n')+1); if (cmd != ""){ - if (cmd[0] == 'P'){ - std::cout << "Push attempt from IP " << cmd.substr(2) << std::endl; - if (thisStream->checkWaitingIP(cmd.substr(2))){ - if (thisStream->setInput(usr->S)){ - std::cout << "Push accepted!" << std::endl; - usr->S = Socket::Connection(-1); - return; + switch (cmd[0]){ + case 'P':{ //Push + std::cout << "Push attempt from IP " << cmd.substr(2) << std::endl; + if (thisStream->checkWaitingIP(cmd.substr(2))){ + if (thisStream->setInput(usr->S)){ + std::cout << "Push accepted!" << std::endl; + usr->S = Socket::Connection(-1); + return; + }else{ + usr->Disconnect("Push denied - push already in progress!"); + } }else{ - usr->Disconnect("Push denied - push already in progress!"); + usr->Disconnect("Push denied - invalid IP address!"); } - }else{ - usr->Disconnect("Push denied - invalid IP address!"); - } - } - if (cmd[0] == 'S'){ - usr->tmpStats = Stats(cmd.substr(2)); - unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime; - if (secs < 1){secs = 1;} - usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs; - usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs; - usr->lastStats = usr->tmpStats; - thisStream->saveStats(usr->MyStr, usr->tmpStats); + } break; + case 'S':{ //Stats + usr->tmpStats = Stats(cmd.substr(2)); + unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime; + if (secs < 1){secs = 1;} + usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs; + usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs; + usr->lastStats = usr->tmpStats; + thisStream->saveStats(usr->MyStr, usr->tmpStats); + } break; + case 's':{ //second-seek + //ignored for now + } break; + case 'f':{ //frame-seek + //ignored for now + } break; + case 'p':{ //play + //ignored for now + } break; + case 'o':{ //once-play + //ignored for now + } break; + case 'q':{ //quit-playing + //ignored for now + } break; } } } @@ -107,12 +124,12 @@ namespace Buffer{ while (std::cin.good() && buffer_running){ //slow down packet receiving to real-time now = getNowMS(); - if ((now - timeDiff >= lastPacket) || (lastPacket - (now - timeDiff) > 5000)){ + if ((now - timeDiff >= lastPacket) || (lastPacket - (now - timeDiff) > 15000)){ thisStream->getWriteLock(); if (thisStream->getStream()->parsePacket(inBuffer)){ thisStream->getStream()->outPacket(0); lastPacket = thisStream->getStream()->getTime(); - if ((now - timeDiff - lastPacket) > 5000 || (now - timeDiff - lastPacket < -5000)){ + if ((now - timeDiff - lastPacket) > 15000 || (now - timeDiff - lastPacket < -15000)){ timeDiff = now - lastPacket; } thisStream->dropWriteLock(true); @@ -123,11 +140,7 @@ namespace Buffer{ inBuffer.append(charBuffer, charCount); } }else{ - if ((lastPacket - (now - timeDiff)) > 999){ - usleep(999000); - }else{ - usleep((lastPacket - (now - timeDiff)) * 1000); - } + usleep(std::min(14999LL, lastTime - (now - timeDiff)) * 1000); } } buffer_running = false; diff --git a/src/conn_http.cpp b/src/conn_http.cpp index cbd866a8..09af0847 100644 --- a/src/conn_http.cpp +++ b/src/conn_http.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include "tinythread.h" #include "embed.js.h" @@ -120,6 +121,7 @@ namespace Connector_HTTP{ }else{ streamname = url.substr(7, url.length() - 10); } + Util::Stream::sanitizeName(streamname); JSON::Value ServConf = JSON::fromFile("/tmp/mist/streamlist"); std::string response; std::string host = H.GetHeader("Host"); @@ -292,13 +294,17 @@ namespace Connector_HTTP{ /// - progressive (request fed from http_progressive connector) std::string getHTTPType(HTTP::Parser & H){ if ((H.url.find("f4m") != std::string::npos) || ((H.url.find("Seg") != std::string::npos) && (H.url.find("Frag") != std::string::npos))){ - H.SetVar("stream", H.url.substr(1,H.url.find("/",1)-1)); + std::string streamname = H.url.substr(1,H.url.find("/",1)-1); + Util::Stream::sanitizeName(streamname); + H.SetVar("stream", streamname); return "dynamic"; } if (H.url.length() > 4){ std::string ext = H.url.substr(H.url.length() - 4, 4); if (ext == ".flv" || ext == ".mp3"){ - H.SetVar("stream", H.url.substr(1,H.url.length() - 5)); + std::string streamname = H.url.substr(1,H.url.length() - 5); + Util::Stream::sanitizeName(streamname); + H.SetVar("stream", streamname); return "progressive"; } } diff --git a/src/conn_http_dynamic.cpp b/src/conn_http_dynamic.cpp index 8014ef56..85ce94fa 100644 --- a/src/conn_http_dynamic.cpp +++ b/src/conn_http_dynamic.cpp @@ -153,6 +153,8 @@ namespace Connector_HTTP{ #if DEBUG >= 4 printf( "Quality: %s, Seg %d Frag %d\n", Quality.c_str(), Segment, ReqFragment); #endif + ss.Send("f " + JSON::Value((long long int)ReqFragment) + "\no \n"); + ss.flush(); Flash_RequestPending++; }else{ streamname = HTTP_R.url.substr(1,HTTP_R.url.find("/",1)-1); @@ -184,7 +186,6 @@ namespace Connector_HTTP{ #if DEBUG >= 3 fprintf(stderr, "Everything connected, starting to send video data...\n"); #endif - ss.Send("play\n");ss.flush(); inited = true; } if ((Flash_RequestPending > 0) && !Flash_FragBuffer.empty()){ diff --git a/src/conn_http_progressive.cpp b/src/conn_http_progressive.cpp index e7b4cfbf..81f7e955 100644 --- a/src/conn_http_progressive.cpp +++ b/src/conn_http_progressive.cpp @@ -76,13 +76,14 @@ namespace Connector_HTTP{ } if (seek_pos){ std::stringstream cmd; - cmd << "seek " << seek_pos << "\n"; - ss.Send(cmd.str().c_str()); + cmd << "s " << seek_pos << "\n"; + ss.Send(cmd.str()); } #if DEBUG >= 3 fprintf(stderr, "Everything connected, starting to send video data...\n"); #endif - ss.Send("play\n");ss.flush(); + ss.Send("p\n"); + ss.flush(); inited = true; } unsigned int now = time(0); diff --git a/src/conn_rtmp.cpp b/src/conn_rtmp.cpp index 10554b94..e68c1a5b 100644 --- a/src/conn_rtmp.cpp +++ b/src/conn_rtmp.cpp @@ -94,7 +94,7 @@ int Connector_RTMP::Connector_RTMP(Socket::Connection conn){ #if DEBUG >= 3 fprintf(stderr, "Everything connected, starting to send video data...\n"); #endif - SS.Send("play\n");SS.flush(); + SS.Send("p\n");SS.flush(); inited = true; } if (inited && !nostats){ diff --git a/src/player.cpp b/src/player.cpp index 84571ef1..8e36d434 100644 --- a/src/player.cpp +++ b/src/player.cpp @@ -1,239 +1,99 @@ +/// \file player.cpp +/// Holds all code for the MistPlayer application used for VoD streams. -#include -#include -#include -#include -#include -#include +#include //for fileno #include #include -#include "player.h" +#include +#include +#include -namespace Player{ - ///\todo Make getNowMS available in a library - /// Gets the current system time in milliseconds. - long long int getNowMS(){ - timeval t; - gettimeofday(&t, 0); - return t.tv_sec * 1000 + t.tv_usec/1000; - }//getNowMS - - void setBlocking(int fd, bool blocking){ - int flags = fcntl(fd, F_GETFL); - if (blocking){ - flags &= ~O_NONBLOCK; - }else{ - flags |= O_NONBLOCK; - } - fcntl(fd, F_SETFL, flags); - } - - File::File(std::string filename){ - stream = new DTSC::Stream(5); - ring = NULL;// ring will be initialized when necessary - fileSrc.open(filename.c_str(), std::ifstream::in | std::ifstream::binary); - setBlocking(STDIN_FILENO, false);//prevent reading from stdin from blocking - std::cout.setf(std::ios::unitbuf);//do not choke - - fileSrc.seekg(0, std::ios::end); - fileSize = fileSrc.tellg(); - fileSrc.seekg(0); - - nextPacket();// initial read always returns nothing - if (!nextPacket()){//parse metadata - std::cout << stream->outHeader(); - } else { - std::cerr << "Error: Expected metadata!" << std::endl; - } - }; - File::~File() { - if (ring) { - stream->dropRing(ring); - ring = NULL; - } - delete stream; - } - /// \returns Number of read bytes or -1 on EOF - int File::fillBuffer(std::string & buffer){ - char buff[1024 * 10]; - if (fileSrc.good()){ - fileSrc.read(buff, sizeof(buff)); - buffer.append(buff, fileSrc.gcount()); - return fileSrc.gcount(); - } - return -1; - } - // \returns True if there is a packet available for pull. - bool File::nextPacket(){ - if (stream->parsePacket(inBuffer)){ - return true; - } else { - fillBuffer(inBuffer); - } - return false; - } - void File::seek(unsigned int miliseconds){ - DTSC::Stream * tmpStream = new DTSC::Stream(1); - unsigned long leftByte = 1, rightByte = fileSize; - unsigned int leftMS = 0, rightMS = INT_MAX; - unsigned long foundRange = 0;//leftMS concatenated with rightMS - int lastOccurences = 0;//times that foundRange stayed the same during an iteration - /// \todo set last packet as last byte, consider metadata - while (lastOccurences < 5){//assume that we have found the position if the boundaries do not in 5 times - std::string buffer; - // binary search: pick the first packet on the right - unsigned long medByte = leftByte + (rightByte - leftByte) / 2; - fileSrc.clear();// clear previous IO errors - fileSrc.seekg(medByte); - - do{ // find first occurrence of packet - int read_bytes = fillBuffer(buffer); - if (read_bytes < 0){// EOF? O noes! EOF! - goto seekDone; - } - unsigned long header_pos = buffer.find(DTSC::Magic_Packet); - if (header_pos == std::string::npos){ - // it is possible that the magic packet is partially shown, e.g. "DTP" - if ((unsigned)read_bytes > strlen(DTSC::Magic_Packet) - 1){ - read_bytes -= strlen(DTSC::Magic_Packet) - 1; - buffer.erase(0, read_bytes); - medByte += read_bytes; - } - continue;// continue filling the buffer without parsing packet - } - }while (!tmpStream->parsePacket(buffer)); - JSON::Value & medPacket = tmpStream->getPacket(0); - /// \todo What if time does not exist? Currently it just crashes. - // assumes that the time does not get over 49 days (on a 32-bit system) - unsigned int medMS = (unsigned int)medPacket["time"].asInt(); - - if (medMS > miliseconds){ - rightByte = medByte; - rightMS = medMS; - }else if (medMS < miliseconds){ - leftByte = medByte; - leftMS = medMS; - } - //concatenate leftMS with rightMS - unsigned long sum = ((unsigned long)leftMS << (CHAR_BIT * sizeof(leftMS))) | rightMS; - if (foundRange == sum){++lastOccurences;}else{foundRange = sum;lastOccurences = 0;} - } -seekDone: - // clear the buffer and adjust file pointer - inBuffer.clear(); - fileSrc.seekg(leftByte); - delete tmpStream; - }; - std::string & File::getPacket(){ - static std::string emptystring; - if (ring->waiting){ - return emptystring; - }//still waiting for next buffer? - if (ring->starved){ - //if corrupt data, warn and get new DTSC::Ring - std::cerr << "Warning: User was send corrupt video data and send to the next keyframe!" << std::endl; - stream->dropRing(ring); - ring = stream->getRing(); - return emptystring; - } - //switch to next buffer - if (ring->b < 0){ - ring->waiting = true; - return emptystring; - }//no next buffer? go in waiting mode. - // get current packet - std::string & packet = stream->outPacket(ring->b); - // make next request take a different packet - ring->b--; - return packet; - } - - /// Reads a command from stdin. Returns true if a command was read. - bool File::readCommand() { - char line[512]; - size_t line_len; - if (fgets(line, sizeof(line), stdin) == NULL){ - return false; - } - line[sizeof(line) - 1] = 0;// in case stream is not null-terminated... - line_len = strlen(line); - if (line[line_len - 1] == '\n'){ - line[--line_len] = 0; - } - { - unsigned int position = INT_MAX;// special value that says "invalid" - if (!strncmp("seek ", line, sizeof("seek ") - 1)){ - position = atoi(line + sizeof("seek ") - 1); - } - if (!strncmp("relseek ", line, sizeof("relseek " - 1))){ - /// \todo implement relseek in a smart way - //position = atoi(line + sizeof("relseek ")); - } - if (position != INT_MAX){ - File::seek(position); - inBuffer.clear();//clear buffered data from file - return true; - } - } - if (!strncmp("byteseek ", line, sizeof("byteseek " - 1))){ - std::streampos byte = atoi(line + sizeof("byteseek ")); - fileSrc.seekg(byte);//if EOF, then it's the client's fault, ignore it. - inBuffer.clear();//clear buffered data from file - return true; - } - if (!strcmp("play", line)){ - playing = true; - } - if (!strcmp("pause", line)){ - playing = false; - } - return false; - } - - void File::Play() { - long long now, timeDiff = 0, lastTime = 0; - while (fileSrc.good() || !inBuffer.empty()) { - if (readCommand()) { - continue; - } - if (!playing){ - setBlocking(STDIN_FILENO, true); - continue; - }else{ - setBlocking(STDIN_FILENO, false); - } - now = getNowMS(); - if (now - timeDiff >= lastTime || lastTime - (now - timeDiff) > 5000) { - if (nextPacket()) { - if (!ring){ring = stream->getRing();}//get ring after reading first non-metadata - std::string & packet = getPacket(); - if (packet.empty()){ - continue; - } - lastTime = stream->getTime(); - if (std::abs(now - timeDiff - lastTime) > 5000) { - timeDiff = now - lastTime; - } - std::cout.write(packet.c_str(), packet.length()); - } - } else { - usleep(std::min(999LL, lastTime - (now - timeDiff)) * 1000); - } - } - } -}; +/// Gets the current system time in milliseconds. +long long int getNowMS(){ + timeval t; + gettimeofday(&t, 0); + return t.tv_sec * 1000 + t.tv_usec/1000; +}//getNowMS int main(int argc, char** argv){ - if (argc < 2){ - std::cerr << "Usage: " << argv[0] << " filename.dtsc" << std::endl; - return 1; + Util::Config conf(argv[0], PACKAGE_VERSION); + conf.addOption("filename", JSON::fromString("{\"arg_num\":1, \"help\":\"Name of the file to write to stdout.\"}")); + conf.parseArgs(argc, argv); + conf.activate(); + int playing = 0; + + DTSC::File source = DTSC::File(conf.getString("filename")); + Socket::Connection in_out = Socket::Connection(fileno(stdin), fileno(stdout)); + std::string meta_str = source.getHeader(); + + //send the header + { + in_out.Send("DTSC"); + unsigned int size = htonl(meta_str.size()); + in_out.Send(std::string((char*)&size, (size_t)4)); + in_out.Send(meta_str); + } + + JSON::Value meta = JSON::fromDTMI(meta_str); + JSON::Value last_pack; + + long long now, timeDiff = 0, lastTime = 0; + + while (in_out.connected()){ + if (in_out.spool() && in_out.Received().find('\n') != std::string::npos){ + std::string cmd = in_out.Received().substr(0, in_out.Received().find('\n')); + in_out.Received().erase(0, in_out.Received().find('\n')+1); + if (cmd != ""){ + switch (cmd[0]){ + case 'P':{ //Push + in_out.close();//pushing to VoD makes no sense + } break; + case 'S':{ //Stats + /// \todo Parse stats command properly. + /* Stats(cmd.substr(2)); */ + } break; + case 's':{ //second-seek + int second = JSON::Value(cmd.substr(2)).asInt(); + double keyms = meta["video"]["keyms"].asInt(); + if (keyms <= 0){keyms = 2000;} + source.seek_frame(second / (keyms / 1000.0)); + } break; + case 'f':{ //frame-seek + source.seek_frame(JSON::Value(cmd.substr(2)).asInt()); + } break; + case 'p':{ //play + playing = -1; + } break; + case 'o':{ //once-play + if (playing < 0){playing = 0;} + ++playing; + } break; + case 'q':{ //quit-playing + playing = 0; + } break; + } + } + } + if (playing != 0){ + now = getNowMS(); + if (now - timeDiff >= lastTime || lastTime - (now - timeDiff) > 15000) { + std::string packet = source.getPacket(); + last_pack = JSON::fromDTMI(packet); + lastTime = last_pack["time"].asInt(); + if ((now - timeDiff - lastTime) > 15000 || (now - timeDiff - lastTime < -15000)){ + timeDiff = now - lastTime; + } + //insert proper header for this type of data + in_out.Send("DTPD"); + //insert the packet length + unsigned int size = htonl(packet.size()); + in_out.Send(std::string((char*)&size, (size_t)4)); + in_out.Send(packet); + } else { + usleep(std::min(14999LL, lastTime - (now - timeDiff)) * 1000); + } + if (playing > 0){--playing;} + } } - std::string filename = argv[1]; - #if DEBUG >= 3 - std::cerr << "VoD " << filename << std::endl; - #endif - Player::File file(filename); - file.Play(); return 0; } - diff --git a/src/player.h b/src/player.h deleted file mode 100644 index a61bb50d..00000000 --- a/src/player.h +++ /dev/null @@ -1,28 +0,0 @@ -/// \file player.h -/// Provides functionality for playing files for Video on Demand - -#pragma once - -#include "buffer_stream.h" - -namespace Player{ - class File{ - private: - std::ifstream fileSrc; ///