Made src directory structure more sane - all binaries are now built in src but sources are divided amongst subdirs of src
This commit is contained in:
parent
3b98ac6547
commit
1762ae9724
29 changed files with 40 additions and 42 deletions
301
src/buffer/buffer.cpp
Normal file
301
src/buffer/buffer.cpp
Normal file
|
@ -0,0 +1,301 @@
|
|||
/// \file buffer.cpp
|
||||
/// Contains the main code for the Buffer.
|
||||
|
||||
#include <fcntl.h>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <cstdlib>
|
||||
#include <cstdio>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include <signal.h>
|
||||
#include <sstream>
|
||||
#include <sys/time.h>
|
||||
#include <mist/config.h>
|
||||
#include "buffer_stream.h"
|
||||
#include <mist/stream.h>
|
||||
|
||||
/// Holds all code unique to the Buffer.
|
||||
namespace Buffer {
|
||||
|
||||
volatile bool buffer_running = true; ///< Set to false when shutting down.
|
||||
Stream * thisStream = 0;
|
||||
Socket::Server SS; ///< The server socket.
|
||||
|
||||
/// Gets the current system time in milliseconds.
|
||||
long long int getNowMS(){
|
||||
timeval t;
|
||||
gettimeofday( &t, 0);
|
||||
return t.tv_sec * 1000 + t.tv_usec / 1000;
|
||||
} //getNowMS
|
||||
|
||||
void handleStats(void * empty){
|
||||
if (empty != 0){
|
||||
return;
|
||||
}
|
||||
std::string double_newline = "\n\n";
|
||||
Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
|
||||
while (buffer_running){
|
||||
usleep(1000000); //sleep one second
|
||||
Stream::get()->cleanUsers();
|
||||
if ( !StatsSocket.connected()){
|
||||
StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
|
||||
}
|
||||
if (StatsSocket.connected()){
|
||||
StatsSocket.Send(Stream::get()->getStats());
|
||||
StatsSocket.Send(double_newline);
|
||||
StatsSocket.flush();
|
||||
}
|
||||
}
|
||||
StatsSocket.close();
|
||||
}
|
||||
|
||||
void handleUser(void * v_usr){
|
||||
user * usr = (user*)v_usr;
|
||||
#if DEBUG >= 5
|
||||
std::cerr << "Thread launched for user " << usr->MyStr << ", socket number " << usr->S.getSocket() << std::endl;
|
||||
#endif
|
||||
|
||||
usr->myRing = thisStream->getRing();
|
||||
if (thisStream->getStream()->metadata && thisStream->getHeader().size() > 0){
|
||||
usr->S.SendNow(thisStream->getHeader());
|
||||
}
|
||||
|
||||
while (usr->S.connected()){
|
||||
usleep(5000); //sleep 5ms
|
||||
if ( !usr->myRing->playCount || !usr->Send()){
|
||||
if (usr->myRing->updated){
|
||||
Stream::get()->getReadLock();
|
||||
usr->S.SendNow(Stream::get()->getStream()->metadata.toNetPacked());
|
||||
Stream::get()->dropReadLock();
|
||||
usr->myRing->updated = false;
|
||||
}
|
||||
if (usr->S.spool()){
|
||||
while (usr->S.Received().size()){
|
||||
//delete anything that doesn't end with a newline
|
||||
if ( !usr->S.Received().get().empty() && *(usr->S.Received().get().rbegin()) != '\n'){
|
||||
usr->S.Received().get().clear();
|
||||
continue;
|
||||
}
|
||||
usr->S.Received().get().resize(usr->S.Received().get().size() - 1);
|
||||
if ( !usr->S.Received().get().empty()){
|
||||
switch (usr->S.Received().get()[0]){
|
||||
case 'P': { //Push
|
||||
std::cout << "Push attempt from IP " << usr->S.Received().get().substr(2) << std::endl;
|
||||
if (thisStream->checkWaitingIP(usr->S.Received().get().substr(2))){
|
||||
usr->S.Received().get().clear();
|
||||
if (thisStream->setInput(usr->S)){
|
||||
std::cout << "Push accepted!" << std::endl;
|
||||
usr->S = Socket::Connection( -1);
|
||||
return;
|
||||
}else{
|
||||
usr->Disconnect("Push denied - push already in progress!");
|
||||
}
|
||||
}else{
|
||||
usr->Disconnect("Push denied - invalid IP address!");
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'S': { //Stats
|
||||
usr->tmpStats = Stats(usr->S.Received().get().substr(2));
|
||||
unsigned int secs = usr->tmpStats.conntime - usr->lastStats.conntime;
|
||||
if (secs < 1){
|
||||
secs = 1;
|
||||
}
|
||||
usr->curr_up = (usr->tmpStats.up - usr->lastStats.up) / secs;
|
||||
usr->curr_down = (usr->tmpStats.down - usr->lastStats.down) / secs;
|
||||
usr->lastStats = usr->tmpStats;
|
||||
thisStream->saveStats(usr->MyStr, usr->tmpStats);
|
||||
break;
|
||||
}
|
||||
case 's': { //second-seek
|
||||
unsigned int ms = JSON::Value(usr->S.Received().get().substr(2)).asInt();
|
||||
usr->myRing->waiting = false;
|
||||
usr->myRing->starved = false;
|
||||
usr->myRing->b = thisStream->getStream()->msSeek(ms);
|
||||
if (usr->myRing->playCount > 0){
|
||||
usr->myRing->playCount = 0;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'f': { //frame-seek
|
||||
unsigned int frameno = JSON::Value(usr->S.Received().get().substr(2)).asInt();
|
||||
usr->myRing->waiting = false;
|
||||
usr->myRing->starved = false;
|
||||
usr->myRing->b = thisStream->getStream()->frameSeek(frameno);
|
||||
if (usr->myRing->playCount > 0){
|
||||
usr->myRing->playCount = 0;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'p': { //play
|
||||
usr->myRing->playCount = -1;
|
||||
break;
|
||||
}
|
||||
case 'o': { //once-play
|
||||
if (usr->myRing->playCount >= 0){
|
||||
usr->myRing->playCount++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'q': { //quit-playing
|
||||
usr->myRing->playCount = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
usr->S.Received().get().clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
usr->Disconnect("Socket closed.");
|
||||
}
|
||||
|
||||
/// Loop reading DTSC data from stdin and processing it at the correct speed.
|
||||
void handleStdin(void * empty){
|
||||
if (empty != 0){
|
||||
return;
|
||||
}
|
||||
long long int timeDiff = 0; //difference between local time and stream time
|
||||
unsigned int lastPacket = 0; //last parsed packet timestamp
|
||||
std::string inBuffer;
|
||||
char charBuffer[1024 * 10];
|
||||
unsigned int charCount;
|
||||
long long int now;
|
||||
|
||||
while (std::cin.good() && buffer_running){
|
||||
//slow down packet receiving to real-time
|
||||
now = getNowMS();
|
||||
if (((now - timeDiff) >= lastPacket) || (lastPacket - (now - timeDiff) > 15000)){
|
||||
thisStream->getWriteLock();
|
||||
if (thisStream->getStream()->parsePacket(inBuffer)){
|
||||
thisStream->getStream()->outPacket(0);
|
||||
lastPacket = thisStream->getStream()->getTime();
|
||||
if ((now - timeDiff - lastPacket) > 15000 || (now - timeDiff - lastPacket < -15000)){
|
||||
timeDiff = now - lastPacket;
|
||||
}
|
||||
thisStream->dropWriteLock(true);
|
||||
}else{
|
||||
thisStream->dropWriteLock(false);
|
||||
std::cin.read(charBuffer, 1024 * 10);
|
||||
charCount = std::cin.gcount();
|
||||
inBuffer.append(charBuffer, charCount);
|
||||
}
|
||||
}else{
|
||||
usleep(std::min(14999LL, lastPacket - (now - timeDiff)) * 1000);
|
||||
}
|
||||
}
|
||||
buffer_running = false;
|
||||
}
|
||||
|
||||
/// Loop reading DTSC data from an IP push address.
|
||||
/// No changes to the speed are made.
|
||||
void handlePushin(void * empty){
|
||||
if (empty != 0){
|
||||
return;
|
||||
}
|
||||
while (buffer_running){
|
||||
if (thisStream->getIPInput().connected()){
|
||||
if (thisStream->getIPInput().spool()){
|
||||
bool packed_parsed = false;
|
||||
do{
|
||||
thisStream->getWriteLock();
|
||||
if (thisStream->getStream()->parsePacket(thisStream->getIPInput().Received())){
|
||||
thisStream->getStream()->outPacket(0);
|
||||
thisStream->dropWriteLock(true);
|
||||
packed_parsed = true;
|
||||
}else{
|
||||
thisStream->dropWriteLock(false);
|
||||
packed_parsed = false;
|
||||
usleep(1000); //1ms wait
|
||||
}
|
||||
}while (packed_parsed);
|
||||
}else{
|
||||
usleep(1000); //1ms wait
|
||||
}
|
||||
}else{
|
||||
usleep(1000000); //1s wait
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Starts a loop, waiting for connections to send data to.
|
||||
int Start(int argc, char ** argv){
|
||||
Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION);
|
||||
conf.addOption("stream_name",
|
||||
JSON::fromString("{\"arg_num\":1, \"arg\":\"string\", \"help\":\"Name of the stream this buffer will be providing.\"}"));
|
||||
conf.addOption("awaiting_ip",
|
||||
JSON::fromString(
|
||||
"{\"arg_num\":2, \"arg\":\"string\", \"default\":\"\", \"help\":\"IP address to expect incoming data from. This will completely disable reading from standard input if used.\"}"));
|
||||
conf.addOption("reportstats",
|
||||
JSON::fromString("{\"default\":0, \"help\":\"Report stats to a controller process.\", \"short\":\"s\", \"long\":\"reportstats\"}"));
|
||||
conf.addOption("time",
|
||||
JSON::fromString(
|
||||
"{\"default\":0, \"arg\": \"integer\", \"help\":\"Buffer a specied amount of time in ms.\", \"short\":\"t\", \"long\":\"time\"}"));
|
||||
conf.parseArgs(argc, argv);
|
||||
|
||||
std::string name = conf.getString("stream_name");
|
||||
|
||||
SS = Util::Stream::makeLive(name);
|
||||
if ( !SS.connected()){
|
||||
perror("Could not create stream socket");
|
||||
return 1;
|
||||
}
|
||||
conf.activate();
|
||||
thisStream = Stream::get();
|
||||
thisStream->setName(name);
|
||||
thisStream->getStream()->setBufferTime(conf.getInteger("time"));
|
||||
Socket::Connection incoming;
|
||||
Socket::Connection std_input(fileno(stdin));
|
||||
|
||||
tthread::thread * StatsThread = 0;
|
||||
if (conf.getBool("reportstats")){
|
||||
StatsThread = new tthread::thread(handleStats, 0);
|
||||
}
|
||||
tthread::thread * StdinThread = 0;
|
||||
std::string await_ip = conf.getString("awaiting_ip");
|
||||
if (await_ip == ""){
|
||||
StdinThread = new tthread::thread(handleStdin, 0);
|
||||
}else{
|
||||
thisStream->setWaitingIP(await_ip);
|
||||
StdinThread = new tthread::thread(handlePushin, 0);
|
||||
}
|
||||
|
||||
while (buffer_running && SS.connected() && conf.is_active){
|
||||
//check for new connections, accept them if there are any
|
||||
//starts a thread for every accepted connection
|
||||
incoming = SS.accept(true);
|
||||
if (incoming.connected()){
|
||||
user * usr_ptr = new user(incoming);
|
||||
thisStream->addUser(usr_ptr);
|
||||
usr_ptr->Thread = new tthread::thread(handleUser, (void *)usr_ptr);
|
||||
}
|
||||
} //main loop
|
||||
|
||||
// disconnect listener
|
||||
buffer_running = false;
|
||||
std::cout << "Buffer shutting down" << std::endl;
|
||||
SS.close();
|
||||
if (StatsThread){
|
||||
StatsThread->join();
|
||||
delete StatsThread;
|
||||
}
|
||||
if (thisStream->getIPInput().connected()){
|
||||
thisStream->getIPInput().close();
|
||||
}
|
||||
StdinThread->join();
|
||||
delete StdinThread;
|
||||
delete thisStream;
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
||||
;
|
||||
//Buffer namespace
|
||||
|
||||
/// Entry point for Buffer, simply calls Buffer::Start().
|
||||
int main(int argc, char ** argv){
|
||||
return Buffer::Start(argc, argv);
|
||||
} //main
|
240
src/buffer/buffer_stream.cpp
Normal file
240
src/buffer/buffer_stream.cpp
Normal file
|
@ -0,0 +1,240 @@
|
|||
/// \file buffer_stream.cpp
|
||||
/// Contains definitions for buffer streams.
|
||||
|
||||
#include "buffer_stream.h"
|
||||
#include <mist/timing.h>
|
||||
|
||||
/// Stores the globally equal reference.
|
||||
Buffer::Stream * Buffer::Stream::ref = 0;
|
||||
|
||||
/// Returns a globally equal reference to this class.
|
||||
Buffer::Stream * Buffer::Stream::get(){
|
||||
static tthread::mutex creator;
|
||||
if (ref == 0){
|
||||
//prevent creating two at the same time
|
||||
creator.lock();
|
||||
if (ref == 0){
|
||||
ref = new Stream();
|
||||
}
|
||||
creator.unlock();
|
||||
}
|
||||
return ref;
|
||||
}
|
||||
|
||||
/// Creates a new DTSC::Stream object, private function so only one instance can exist.
|
||||
Buffer::Stream::Stream(){
|
||||
Strm = new DTSC::Stream(5);
|
||||
readers = 0;
|
||||
writers = 0;
|
||||
}
|
||||
|
||||
/// Do cleanup on delete.
|
||||
Buffer::Stream::~Stream(){
|
||||
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
|
||||
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
|
||||
if (( * *usersIt).S.connected()){
|
||||
( * *usersIt).S.close();
|
||||
}
|
||||
}
|
||||
moreData.notify_all();
|
||||
delete Strm;
|
||||
}
|
||||
|
||||
/// Calculate and return the current statistics in JSON format.
|
||||
std::string & Buffer::Stream::getStats(){
|
||||
static std::string ret;
|
||||
long long int now = Util::epoch();
|
||||
unsigned int tot_up = 0, tot_down = 0, tot_count = 0;
|
||||
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
|
||||
if (users.size() > 0){
|
||||
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
|
||||
tot_down += ( * *usersIt).curr_down;
|
||||
tot_up += ( * *usersIt).curr_up;
|
||||
tot_count++;
|
||||
}
|
||||
}
|
||||
Storage["totals"]["down"] = tot_down;
|
||||
Storage["totals"]["up"] = tot_up;
|
||||
Storage["totals"]["count"] = tot_count;
|
||||
Storage["totals"]["now"] = now;
|
||||
Storage["buffer"] = name;
|
||||
Storage["meta"] = Strm->metadata;
|
||||
if (Storage["meta"].isMember("audio")){
|
||||
Storage["meta"]["audio"].removeMember("init");
|
||||
}
|
||||
if (Storage["meta"].isMember("video")){
|
||||
Storage["meta"]["video"].removeMember("init");
|
||||
}
|
||||
ret = Storage.toString();
|
||||
Storage["log"].null();
|
||||
return ret;
|
||||
}
|
||||
|
||||
/// Get a new DTSC::Ring object for a user.
|
||||
DTSC::Ring * Buffer::Stream::getRing(){
|
||||
return Strm->getRing();
|
||||
}
|
||||
|
||||
/// Drop a DTSC::Ring object.
|
||||
void Buffer::Stream::dropRing(DTSC::Ring * ring){
|
||||
Strm->dropRing(ring);
|
||||
}
|
||||
|
||||
/// Get the (constant) header data of this stream.
|
||||
std::string & Buffer::Stream::getHeader(){
|
||||
return Strm->outHeader();
|
||||
}
|
||||
|
||||
/// Set the IP address to accept push data from.
|
||||
void Buffer::Stream::setWaitingIP(std::string ip){
|
||||
waiting_ip = ip;
|
||||
}
|
||||
|
||||
/// Check if this is the IP address to accept push data from.
|
||||
bool Buffer::Stream::checkWaitingIP(std::string ip){
|
||||
if (ip == waiting_ip || ip == "::ffff:" + waiting_ip){
|
||||
return true;
|
||||
}else{
|
||||
std::cout << ip << " != (::ffff:)" << waiting_ip << std::endl;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the current socket for push data.
|
||||
bool Buffer::Stream::setInput(Socket::Connection S){
|
||||
if (ip_input.connected()){
|
||||
return false;
|
||||
}else{
|
||||
ip_input = S;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the current socket for push data.
|
||||
Socket::Connection & Buffer::Stream::getIPInput(){
|
||||
return ip_input;
|
||||
}
|
||||
|
||||
/// Stores intermediate statistics.
|
||||
void Buffer::Stream::saveStats(std::string username, Stats & stats){
|
||||
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
|
||||
Storage["curr"][username]["connector"] = stats.connector;
|
||||
Storage["curr"][username]["up"] = stats.up;
|
||||
Storage["curr"][username]["down"] = stats.down;
|
||||
Storage["curr"][username]["conntime"] = stats.conntime;
|
||||
Storage["curr"][username]["host"] = stats.host;
|
||||
Storage["curr"][username]["start"] = Util::epoch() - stats.conntime;
|
||||
}
|
||||
|
||||
/// Stores final statistics.
|
||||
void Buffer::Stream::clearStats(std::string username, Stats & stats, std::string reason){
|
||||
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
|
||||
if (Storage["curr"].isMember(username)){
|
||||
Storage["curr"].removeMember(username);
|
||||
#if DEBUG >= 4
|
||||
std::cout << "Disconnected user " << username << ": " << reason << ". " << stats.connector << " transferred " << stats.up << " up and "
|
||||
<< stats.down << " down in " << stats.conntime << " seconds to " << stats.host << std::endl;
|
||||
#endif
|
||||
}
|
||||
Storage["log"][username]["connector"] = stats.connector;
|
||||
Storage["log"][username]["up"] = stats.up;
|
||||
Storage["log"][username]["down"] = stats.down;
|
||||
Storage["log"][username]["conntime"] = stats.conntime;
|
||||
Storage["log"][username]["host"] = stats.host;
|
||||
Storage["log"][username]["start"] = Util::epoch() - stats.conntime;
|
||||
}
|
||||
|
||||
/// Cleans up broken connections
|
||||
void Buffer::Stream::cleanUsers(){
|
||||
bool repeat = false;
|
||||
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
|
||||
do{
|
||||
repeat = false;
|
||||
if (users.size() > 0){
|
||||
for (usersIt = users.begin(); usersIt != users.end(); usersIt++){
|
||||
if (( * *usersIt).Thread == 0 && !( * *usersIt).S.connected()){
|
||||
delete *usersIt;
|
||||
users.erase(usersIt);
|
||||
repeat = true;
|
||||
break;
|
||||
}else{
|
||||
if ( !( * *usersIt).S.connected()){
|
||||
if (( * *usersIt).Thread->joinable()){
|
||||
( * *usersIt).Thread->join();
|
||||
delete ( * *usersIt).Thread;
|
||||
( * *usersIt).Thread = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}while (repeat);
|
||||
}
|
||||
|
||||
/// Blocks until writing is safe.
|
||||
void Buffer::Stream::getWriteLock(){
|
||||
rw_mutex.lock();
|
||||
writers++;
|
||||
while (writers != 1 && readers != 0){
|
||||
rw_change.wait(rw_mutex);
|
||||
}
|
||||
rw_mutex.unlock();
|
||||
}
|
||||
|
||||
/// Drops a previously gotten write lock.
|
||||
void Buffer::Stream::dropWriteLock(bool newpackets_available){
|
||||
if (newpackets_available){
|
||||
if (Strm->getPacket(0).isMember("keyframe")){
|
||||
stats_mutex.lock();
|
||||
Strm->updateHeaders();
|
||||
stats_mutex.unlock();
|
||||
}
|
||||
}
|
||||
rw_mutex.lock();
|
||||
writers--;
|
||||
rw_mutex.unlock();
|
||||
rw_change.notify_all();
|
||||
if (newpackets_available){
|
||||
moreData.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
/// Blocks until reading is safe.
|
||||
void Buffer::Stream::getReadLock(){
|
||||
rw_mutex.lock();
|
||||
while (writers > 0){
|
||||
rw_change.wait(rw_mutex);
|
||||
}
|
||||
readers++;
|
||||
rw_mutex.unlock();
|
||||
}
|
||||
|
||||
/// Drops a previously gotten read lock.
|
||||
void Buffer::Stream::dropReadLock(){
|
||||
rw_mutex.lock();
|
||||
readers--;
|
||||
rw_mutex.unlock();
|
||||
rw_change.notify_all();
|
||||
}
|
||||
|
||||
/// Retrieves a reference to the DTSC::Stream
|
||||
DTSC::Stream * Buffer::Stream::getStream(){
|
||||
return Strm;
|
||||
}
|
||||
|
||||
/// Sets the buffer name.
|
||||
void Buffer::Stream::setName(std::string n){
|
||||
name = n;
|
||||
}
|
||||
|
||||
/// Add a user to the userlist.
|
||||
void Buffer::Stream::addUser(user * new_user){
|
||||
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
|
||||
users.push_back(new_user);
|
||||
}
|
||||
|
||||
/// Blocks the thread until new data is available.
|
||||
void Buffer::Stream::waitForData(){
|
||||
tthread::lock_guard<tthread::mutex> guard(stats_mutex);
|
||||
moreData.wait(stats_mutex);
|
||||
}
|
76
src/buffer/buffer_stream.h
Normal file
76
src/buffer/buffer_stream.h
Normal file
|
@ -0,0 +1,76 @@
|
|||
/// \file buffer_stream.h
|
||||
/// Contains definitions for buffer streams.
|
||||
|
||||
#pragma once
|
||||
#include <string>
|
||||
#include <mist/dtsc.h>
|
||||
#include <mist/json.h>
|
||||
#include <mist/socket.h>
|
||||
#include "tinythread.h"
|
||||
#include "buffer_user.h"
|
||||
|
||||
namespace Buffer {
|
||||
/// Keeps track of a single streams inputs and outputs, taking care of thread safety and all other related issues.
|
||||
class Stream{
|
||||
public:
|
||||
/// Get a reference to this Stream object.
|
||||
static Stream * get();
|
||||
/// Get the current statistics in JSON format.
|
||||
std::string & getStats();
|
||||
/// Get a new DTSC::Ring object for a user.
|
||||
DTSC::Ring * getRing();
|
||||
/// Drop a DTSC::Ring object.
|
||||
void dropRing(DTSC::Ring * ring);
|
||||
/// Get the (constant) header data of this stream.
|
||||
std::string & getHeader();
|
||||
/// Set the IP address to accept push data from.
|
||||
void setWaitingIP(std::string ip);
|
||||
/// Check if this is the IP address to accept push data from.
|
||||
bool checkWaitingIP(std::string ip);
|
||||
/// Sets the current socket for push data.
|
||||
bool setInput(Socket::Connection S);
|
||||
/// Gets the current socket for push data.
|
||||
Socket::Connection & getIPInput();
|
||||
/// Stores intermediate statistics.
|
||||
void saveStats(std::string username, Stats & stats);
|
||||
/// Stores final statistics.
|
||||
void clearStats(std::string username, Stats & stats, std::string reason);
|
||||
/// Cleans up broken connections
|
||||
void cleanUsers();
|
||||
/// Blocks until writing is safe.
|
||||
void getWriteLock();
|
||||
/// Drops a previously gotten write lock.
|
||||
void dropWriteLock(bool newpackets_available);
|
||||
/// Blocks until reading is safe.
|
||||
void getReadLock();
|
||||
/// Drops a previously gotten read lock.
|
||||
void dropReadLock();
|
||||
/// Retrieves a reference to the DTSC::Stream
|
||||
DTSC::Stream * getStream();
|
||||
/// Sets the buffer name.
|
||||
void setName(std::string n);
|
||||
/// Add a user to the userlist.
|
||||
void addUser(user * new_user);
|
||||
/// Blocks the thread until new data is available.
|
||||
void waitForData();
|
||||
/// Cleanup function
|
||||
~Stream();
|
||||
private:
|
||||
volatile int readers; ///< Current count of active readers;
|
||||
volatile int writers; ///< Current count of waiting/active writers.
|
||||
tthread::mutex rw_mutex; ///< Mutex for read/write locking.
|
||||
tthread::condition_variable rw_change; ///< Triggered when reader/writer count changes.
|
||||
static Stream * ref;
|
||||
Stream();
|
||||
JSON::Value Storage; ///< Global storage of data.
|
||||
DTSC::Stream * Strm;
|
||||
std::string waiting_ip; ///< IP address for media push.
|
||||
Socket::Connection ip_input; ///< Connection used for media push.
|
||||
tthread::mutex stats_mutex; ///< Mutex for stats/users modifications.
|
||||
std::vector<user*> users; ///< All connected users.
|
||||
std::vector<user*>::iterator usersIt; ///< Iterator for all connected users.
|
||||
std::string name; ///< Name for this buffer.
|
||||
tthread::condition_variable moreData; ///< Triggered when more data becomes available.
|
||||
};
|
||||
}
|
||||
;
|
149
src/buffer/buffer_user.cpp
Normal file
149
src/buffer/buffer_user.cpp
Normal file
|
@ -0,0 +1,149 @@
|
|||
/// \file buffer_user.cpp
|
||||
/// Contains code for buffer users.
|
||||
|
||||
#include "buffer_user.h"
|
||||
#include "buffer_stream.h"
|
||||
#include <sstream>
|
||||
#include <stdlib.h> //for atoi and friends
|
||||
int Buffer::user::UserCount = 0;
|
||||
|
||||
/// Creates a new user from a newly connected socket.
|
||||
/// Also prints "User connected" text to stdout.
|
||||
Buffer::user::user(Socket::Connection fd){
|
||||
S = fd;
|
||||
MyNum = UserCount++;
|
||||
std::stringstream st;
|
||||
st << MyNum;
|
||||
MyStr = st.str();
|
||||
curr_up = 0;
|
||||
curr_down = 0;
|
||||
currsend = 0;
|
||||
myRing = 0;
|
||||
Thread = 0;
|
||||
gotproperaudio = false;
|
||||
lastpointer = 0;
|
||||
} //constructor
|
||||
|
||||
/// Drops held DTSC::Ring class, if one is held.
|
||||
Buffer::user::~user(){
|
||||
Stream::get()->dropRing(myRing);
|
||||
} //destructor
|
||||
|
||||
/// Disconnects the current user. Doesn't do anything if already disconnected.
|
||||
/// Prints "Disconnected user" to stdout if disconnect took place.
|
||||
void Buffer::user::Disconnect(std::string reason){
|
||||
if (S.connected()){
|
||||
S.close();
|
||||
}
|
||||
Stream::get()->clearStats(MyStr, lastStats, reason);
|
||||
} //Disconnect
|
||||
|
||||
/// Tries to send the current buffer, returns true if success, false otherwise.
|
||||
/// Has a side effect of dropping the connection if send will never complete.
|
||||
bool Buffer::user::doSend(const char * ptr, int len){
|
||||
if ( !len){
|
||||
return true;
|
||||
} //do not do empty sends
|
||||
int r = S.iwrite(ptr + currsend, len - currsend);
|
||||
if (r <= 0){
|
||||
if (errno == EWOULDBLOCK){
|
||||
return false;
|
||||
}
|
||||
Disconnect(S.getError());
|
||||
return false;
|
||||
}
|
||||
currsend += r;
|
||||
return (currsend == len);
|
||||
} //doSend
|
||||
|
||||
/// Try to send data to this user. Disconnects if any problems occur.
|
||||
bool Buffer::user::Send(){
|
||||
if ( !myRing){
|
||||
return false;
|
||||
} //no ring!
|
||||
if ( !S.connected()){
|
||||
return false;
|
||||
} //cancel if not connected
|
||||
if (myRing->waiting){
|
||||
Stream::get()->waitForData();
|
||||
if ( !myRing->waiting){
|
||||
Stream::get()->getReadLock();
|
||||
if (Stream::get()->getStream()->getPacket(myRing->b).isMember("keyframe") && myRing->playCount > 0){
|
||||
myRing->playCount--;
|
||||
if ( !myRing->playCount){
|
||||
JSON::Value pausemark;
|
||||
pausemark["datatype"] = "pause_marker";
|
||||
pausemark["time"] = Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt();
|
||||
pausemark.toPacked();
|
||||
S.SendNow(pausemark.toNetPacked());
|
||||
}
|
||||
}
|
||||
Stream::get()->dropReadLock();
|
||||
}
|
||||
return false;
|
||||
} //still waiting for next buffer?
|
||||
if (myRing->starved){
|
||||
//if corrupt data, warn and get new DTSC::Ring
|
||||
std::cout << "Warning: User " << MyNum << " was send corrupt video data and send to the next keyframe!" << std::endl;
|
||||
Stream::get()->dropRing(myRing);
|
||||
myRing = Stream::get()->getRing();
|
||||
return false;
|
||||
}
|
||||
//try to complete a send
|
||||
Stream::get()->getReadLock();
|
||||
if (doSend(Stream::get()->getStream()->outPacket(myRing->b).c_str(), Stream::get()->getStream()->outPacket(myRing->b).length())){
|
||||
//switch to next buffer
|
||||
currsend = 0;
|
||||
if (myRing->b <= 0){
|
||||
myRing->waiting = true;
|
||||
return false;
|
||||
} //no next buffer? go in waiting mode.
|
||||
myRing->b--;
|
||||
if (Stream::get()->getStream()->getPacket(myRing->b).isMember("keyframe") && myRing->playCount > 0){
|
||||
myRing->playCount--;
|
||||
if ( !myRing->playCount){
|
||||
JSON::Value pausemark;
|
||||
pausemark["datatype"] = "pause_marker";
|
||||
pausemark["time"] = Stream::get()->getStream()->getPacket(myRing->b)["time"].asInt();
|
||||
pausemark.toPacked();
|
||||
S.SendNow(pausemark.toNetPacked());
|
||||
}
|
||||
}
|
||||
Stream::get()->dropReadLock();
|
||||
return false;
|
||||
} //completed a send
|
||||
Stream::get()->dropReadLock();
|
||||
return true;
|
||||
} //send
|
||||
|
||||
/// Default constructor - should not be in use.
|
||||
Buffer::Stats::Stats(){
|
||||
up = 0;
|
||||
down = 0;
|
||||
conntime = 0;
|
||||
}
|
||||
|
||||
/// Reads a stats string and parses it to the internal representation.
|
||||
Buffer::Stats::Stats(std::string s){
|
||||
size_t f = s.find(' ');
|
||||
if (f != std::string::npos){
|
||||
host = s.substr(0, f);
|
||||
s.erase(0, f + 1);
|
||||
}
|
||||
f = s.find(' ');
|
||||
if (f != std::string::npos){
|
||||
connector = s.substr(0, f);
|
||||
s.erase(0, f + 1);
|
||||
}
|
||||
f = s.find(' ');
|
||||
if (f != std::string::npos){
|
||||
conntime = atoi(s.substr(0, f).c_str());
|
||||
s.erase(0, f + 1);
|
||||
}
|
||||
f = s.find(' ');
|
||||
if (f != std::string::npos){
|
||||
up = atoi(s.substr(0, f).c_str());
|
||||
s.erase(0, f + 1);
|
||||
down = atoi(s.c_str());
|
||||
}
|
||||
}
|
55
src/buffer/buffer_user.h
Normal file
55
src/buffer/buffer_user.h
Normal file
|
@ -0,0 +1,55 @@
|
|||
/// \file buffer_user.h
|
||||
/// Contains definitions for buffer users.
|
||||
|
||||
#pragma once
|
||||
#include <string>
|
||||
#include <mist/dtsc.h>
|
||||
#include <mist/socket.h>
|
||||
#include "tinythread.h"
|
||||
|
||||
namespace Buffer {
|
||||
/// Converts a stats line to up, down, host, connector and conntime values.
|
||||
class Stats{
|
||||
public:
|
||||
unsigned int up;
|
||||
unsigned int down;
|
||||
std::string host;
|
||||
std::string connector;
|
||||
unsigned int conntime;
|
||||
Stats();
|
||||
Stats(std::string s);
|
||||
};
|
||||
|
||||
/// Holds connected users.
|
||||
/// Keeps track of what buffer users are using and the connection status.
|
||||
class user{
|
||||
public:
|
||||
tthread::thread * Thread; ///< Holds the thread dealing with this user.
|
||||
DTSC::Ring * myRing; ///< Ring of the buffer for this user.
|
||||
int MyNum; ///< User ID of this user.
|
||||
std::string MyStr; ///< User ID of this user as a string.
|
||||
std::string inbuffer; ///< Used to buffer input data.
|
||||
int currsend; ///< Current amount of bytes sent.
|
||||
Stats lastStats; ///< Holds last known stats for this connection.
|
||||
Stats tmpStats; ///< Holds temporary stats for this connection.
|
||||
unsigned int curr_up; ///< Holds the current estimated transfer speed up.
|
||||
unsigned int curr_down; ///< Holds the current estimated transfer speed down.
|
||||
bool gotproperaudio; ///< Whether the user received proper audio yet.
|
||||
void * lastpointer; ///< Pointer to data part of current buffer.
|
||||
static int UserCount; ///< Global user counter.
|
||||
Socket::Connection S; ///< Connection to user
|
||||
/// Creates a new user from a newly connected socket.
|
||||
/// Also prints "User connected" text to stdout.
|
||||
user(Socket::Connection fd);
|
||||
/// Drops held DTSC::Ring class, if one is held.
|
||||
~user();
|
||||
/// Disconnects the current user. Doesn't do anything if already disconnected.
|
||||
/// Prints "Disconnected user" to stdout if disconnect took place.
|
||||
void Disconnect(std::string reason);
|
||||
/// Tries to send the current buffer, returns true if success, false otherwise.
|
||||
/// Has a side effect of dropping the connection if send will never complete.
|
||||
bool doSend(const char * ptr, int len);
|
||||
/// Try to send data to this user. Disconnects if any problems occur.
|
||||
bool Send();
|
||||
};
|
||||
}
|
228
src/buffer/player.cpp
Normal file
228
src/buffer/player.cpp
Normal file
|
@ -0,0 +1,228 @@
|
|||
/// \file player.cpp
|
||||
/// Holds all code for the MistPlayer application used for VoD streams.
|
||||
|
||||
#include <iostream>//for std::cerr
|
||||
#include <stdio.h> //for fileno
|
||||
#include <stdlib.h> //for atoi
|
||||
#include <sys/time.h>
|
||||
#include <mist/dtsc.h>
|
||||
#include <mist/json.h>
|
||||
#include <mist/config.h>
|
||||
#include <mist/socket.h>
|
||||
#include <mist/timing.h>
|
||||
|
||||
//under cygwin, recv blocks for ~15ms if no data is available.
|
||||
//This is a hack to keep performance decent with that bug present.
|
||||
#ifdef __CYGWIN__
|
||||
#define CYG_DEFI int cyg_count;
|
||||
#define CYG_INCR cyg_count++;
|
||||
#define CYG_LOOP (cyg_count % 20 == 0) &&
|
||||
#else
|
||||
#define CYG_DEFI
|
||||
#define CYG_INCR
|
||||
#define CYG_LOOP
|
||||
#endif
|
||||
|
||||
/// Copy of stats from buffer_user.cpp
|
||||
class Stats{
|
||||
public:
|
||||
unsigned int up;
|
||||
unsigned int down;
|
||||
std::string host;
|
||||
std::string connector;
|
||||
unsigned int conntime;
|
||||
Stats(){
|
||||
up = 0;
|
||||
down = 0;
|
||||
conntime = 0;
|
||||
}
|
||||
;
|
||||
/// Reads a stats string and parses it to the internal representation.
|
||||
Stats(std::string s){
|
||||
size_t f = s.find(' ');
|
||||
if (f != std::string::npos){
|
||||
host = s.substr(0, f);
|
||||
s.erase(0, f + 1);
|
||||
}
|
||||
f = s.find(' ');
|
||||
if (f != std::string::npos){
|
||||
connector = s.substr(0, f);
|
||||
s.erase(0, f + 1);
|
||||
}
|
||||
f = s.find(' ');
|
||||
if (f != std::string::npos){
|
||||
conntime = atoi(s.substr(0, f).c_str());
|
||||
s.erase(0, f + 1);
|
||||
}
|
||||
f = s.find(' ');
|
||||
if (f != std::string::npos){
|
||||
up = atoi(s.substr(0, f).c_str());
|
||||
s.erase(0, f + 1);
|
||||
down = atoi(s.c_str());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
int main(int argc, char** argv){
|
||||
Util::Config conf(argv[0], PACKAGE_VERSION);
|
||||
conf.addOption("filename", JSON::fromString("{\"arg_num\":1, \"help\":\"Name of the file to write to stdout.\"}"));
|
||||
conf.parseArgs(argc, argv);
|
||||
conf.activate();
|
||||
int playing = 0;
|
||||
|
||||
DTSC::File source = DTSC::File(conf.getString("filename"));
|
||||
Socket::Connection in_out = Socket::Connection(fileno(stdout), fileno(stdin));
|
||||
JSON::Value meta = source.getMeta();
|
||||
JSON::Value pausemark;
|
||||
pausemark["datatype"] = "pause_marker";
|
||||
pausemark["time"] = (long long int)0;
|
||||
|
||||
Socket::Connection StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
|
||||
int lasttime = Util::epoch(); //time last packet was sent
|
||||
|
||||
//send the header
|
||||
std::string meta_str = meta.toNetPacked();
|
||||
in_out.Send(meta_str);
|
||||
|
||||
if (meta["video"]["keyms"].asInt() < 11){
|
||||
meta["video"]["keyms"] = (long long int)1000;
|
||||
}
|
||||
JSON::Value last_pack;
|
||||
|
||||
bool meta_sent = false;
|
||||
long long now, lastTime = 0; //for timing of sending packets
|
||||
long long bench = 0; //for benchmarking
|
||||
Stats sts;
|
||||
CYG_DEFI
|
||||
|
||||
while (in_out.connected() && (Util::epoch() - lasttime < 60)){
|
||||
CYG_INCR
|
||||
if (CYG_LOOP in_out.spool()){
|
||||
while (in_out.Received().size()){
|
||||
//delete anything that doesn't end with a newline
|
||||
if ( *(in_out.Received().get().rbegin()) != '\n'){
|
||||
in_out.Received().get().clear();
|
||||
continue;
|
||||
}
|
||||
in_out.Received().get().resize(in_out.Received().get().size() - 1);
|
||||
if ( !in_out.Received().get().empty()){
|
||||
switch (in_out.Received().get()[0]){
|
||||
case 'P': { //Push
|
||||
#if DEBUG >= 4
|
||||
std::cerr << "Received push - ignoring (" << in_out.Received().get() << ")" << std::endl;
|
||||
#endif
|
||||
in_out.close(); //pushing to VoD makes no sense
|
||||
}
|
||||
break;
|
||||
case 'S': { //Stats
|
||||
if ( !StatsSocket.connected()){
|
||||
StatsSocket = Socket::Connection("/tmp/mist/statistics", true);
|
||||
}
|
||||
if (StatsSocket.connected()){
|
||||
sts = Stats(in_out.Received().get().substr(2));
|
||||
JSON::Value json_sts;
|
||||
json_sts["vod"]["down"] = (long long int)sts.down;
|
||||
json_sts["vod"]["up"] = (long long int)sts.up;
|
||||
json_sts["vod"]["time"] = (long long int)sts.conntime;
|
||||
json_sts["vod"]["host"] = sts.host;
|
||||
json_sts["vod"]["connector"] = sts.connector;
|
||||
json_sts["vod"]["filename"] = conf.getString("filename");
|
||||
json_sts["vod"]["now"] = Util::epoch();
|
||||
json_sts["vod"]["start"] = Util::epoch() - sts.conntime;
|
||||
if ( !meta_sent){
|
||||
json_sts["vod"]["meta"] = meta;
|
||||
json_sts["vod"]["meta"]["audio"].removeMember("init");
|
||||
json_sts["vod"]["meta"]["video"].removeMember("init");
|
||||
json_sts["vod"]["meta"].removeMember("keytime");
|
||||
json_sts["vod"]["meta"].removeMember("keybpos");
|
||||
meta_sent = true;
|
||||
}
|
||||
StatsSocket.Send(json_sts.toString().c_str());
|
||||
StatsSocket.Send("\n\n");
|
||||
StatsSocket.flush();
|
||||
}
|
||||
}
|
||||
break;
|
||||
case 's': { //second-seek
|
||||
int ms = JSON::Value(in_out.Received().get().substr(2)).asInt();
|
||||
bool ret = source.seek_time(ms);
|
||||
lastTime = 0;
|
||||
}
|
||||
break;
|
||||
case 'f': { //frame-seek
|
||||
bool ret = source.seek_frame(JSON::Value(in_out.Received().get().substr(2)).asInt());
|
||||
lastTime = 0;
|
||||
}
|
||||
break;
|
||||
case 'p': { //play
|
||||
playing = -1;
|
||||
lastTime = 0;
|
||||
in_out.setBlocking(false);
|
||||
}
|
||||
break;
|
||||
case 'o': { //once-play
|
||||
if (playing <= 0){
|
||||
playing = 1;
|
||||
}
|
||||
++playing;
|
||||
in_out.setBlocking(false);
|
||||
bench = Util::getMS();
|
||||
}
|
||||
break;
|
||||
case 'q': { //quit-playing
|
||||
playing = 0;
|
||||
in_out.setBlocking(true);
|
||||
}
|
||||
break;
|
||||
}
|
||||
in_out.Received().get().clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (playing != 0){
|
||||
now = Util::getMS();
|
||||
source.seekNext();
|
||||
if ( !source.getJSON()){
|
||||
playing = 0;
|
||||
}
|
||||
if (source.getJSON().isMember("keyframe")){
|
||||
if (playing == -1 && meta["video"]["keyms"].asInt() > now - lastTime){
|
||||
Util::sleep(meta["video"]["keyms"].asInt() - (now - lastTime));
|
||||
}
|
||||
lastTime = now;
|
||||
if (playing > 0){
|
||||
--playing;
|
||||
}
|
||||
}
|
||||
if (playing == 0){
|
||||
#if DEBUG >= 4
|
||||
std::cerr << "Completed VoD request in MistPlayer (" << (Util::getMS() - bench) << "ms)" << std::endl;
|
||||
#endif
|
||||
pausemark["time"] = source.getJSON()["time"];
|
||||
pausemark.toPacked();
|
||||
in_out.SendNow(pausemark.toNetPacked());
|
||||
in_out.setBlocking(true);
|
||||
}else{
|
||||
lasttime = Util::epoch();
|
||||
//insert proper header for this type of data
|
||||
in_out.Send("DTPD");
|
||||
//insert the packet length
|
||||
unsigned int size = htonl(source.getPacket().size());
|
||||
in_out.Send((char*) &size, 4);
|
||||
in_out.SendNow(source.getPacket());
|
||||
}
|
||||
}else{
|
||||
Util::sleep(10);
|
||||
}
|
||||
}
|
||||
StatsSocket.close();
|
||||
in_out.close();
|
||||
#if DEBUG >= 5
|
||||
if (Util::epoch() - lasttime < 60){
|
||||
std::cerr << "MistPlayer exited (disconnect)." << std::endl;
|
||||
}else{
|
||||
std::cerr << "MistPlayer exited (command timeout)." << std::endl;
|
||||
}
|
||||
#endif
|
||||
return 0;
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue