This commit is contained in:
Thulinma 2011-10-18 09:10:19 +02:00
parent d34bfdc739
commit 6cf88f4cee
4 changed files with 172 additions and 26 deletions

View file

@ -18,7 +18,7 @@
#include <sys/time.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <signal.h>
#include "../util/socket.h"
#include "../util/http_parser.h"
#include "../util/md5.h"
@ -33,6 +33,35 @@
#define STRINGIFY(x) #x
#define TOSTRING(x) STRINGIFY(x)
Socket::Server API_Socket; ///< Main connection socket.
/// Basic signal handler. Disconnects the server_socket if it receives
/// a SIGINT, SIGHUP or SIGTERM signal, but does nothing for SIGPIPE.
/// Disconnecting the server_socket will terminate the main listening loop
/// and cleanly shut down the process.
void signal_handler (int signum){
switch (signum){
case SIGINT:
#if DEBUG >= 1
fprintf(stderr, "Received SIGINT - closing server socket.\n");
#endif
break;
case SIGHUP:
#if DEBUG >= 1
fprintf(stderr, "Received SIGHUP - closing server socket.\n");
#endif
break;
case SIGTERM:
#if DEBUG >= 1
fprintf(stderr, "Received SIGTERM - closing server socket.\n");
#endif
break;
default: return; break;
}
API_Socket.close();
}//signal_handler
/// Needed for base64_encode function
static const std::string base64_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
@ -137,7 +166,7 @@ void RSA_Load(){
/// Returns true if the data could be verified, false otherwise.
bool RSA_check(std::string & data, std::string basesign){
std::string sign = base64_decode(basesign);
return (RSA_verify(NID_md5, (const unsigned char*)data.c_str(), data.size(), (const unsigned char*)sign.c_str(), sign.size(), pubkey) == 1);
return (RSA_verify(NID_md5, (unsigned char*)data.c_str(), data.size(), (unsigned char*)sign.c_str(), sign.size(), pubkey) == 1);
}
Json::Value Storage = Json::Value(Json::objectValue); ///< Global storage of data.
@ -223,11 +252,43 @@ void Authorize( Json::Value & Request, Json::Value & Response, ConnectedUser & c
return;
}
void CheckProtocols(Json::Value & p){
static std::map<std::string, std::string> connports;
bool seenHTTP = false;
bool seenRTMP = false;
std::string tmp;
Util::Procs::Stop("RTMP");
for (Json::ValueIterator jit = p.begin(); jit != p.end(); jit++){
if (jit.memberName() == std::string("HTTP")){
tmp = p[jit.memberName()]["port"].asString();
seenHTTP = true;
if (connports["HTTP"] != tmp){Util::Procs::Stop("HTTP");}
connports["HTTP"] = tmp;
if (!Util::Procs::isActive("HTTP")){
Util::Procs::Start("HTTP", std::string("DDV_Conn_HTTP -p ")+tmp);
}
}
if (jit.memberName() == std::string("RTMP")){
tmp = p[jit.memberName()]["port"].asString();
seenRTMP = true;
if (connports["RTMP"] != tmp){Util::Procs::Stop("RTMP");}
connports["RTMP"] = tmp;
if (!Util::Procs::isActive("RTMP")){
Util::Procs::Start("RTMP", std::string("DDV_Conn_RTMP -p ")+tmp);
}
}
}
if (!seenHTTP){Util::Procs::Stop("HTTP");}
if (!seenRTMP){Util::Procs::Stop("RTMP");}
}
void CheckConfig(Json::Value & in, Json::Value & out){
if (in.isObject() && (in.size() > 0)){
for (Json::ValueIterator jit = in.begin(); jit != in.end(); jit++){
if (out.isObject() && out.isMember(jit.memberName())){
Log("CONF", std::string("Updated configuration value ")+jit.memberName());
if (in[jit.memberName()] != out[jit.memberName()]){
Log("CONF", std::string("Updated configuration value ")+jit.memberName());
}
}else{
Log("CONF", std::string("New configuration value ")+jit.memberName());
}
@ -244,20 +305,50 @@ void CheckConfig(Json::Value & in, Json::Value & out){
out["version"] = TOSTRING(VERSION);
}
void startStream(std::string name, Json::Value & data){
Log("BUFF", "(re)starting stream buffer "+name);
std::string URL = data["channel"]["URL"].asString();
std::string preset = data["preset"]["cmd"].asString();
std::string cmd1, cmd2;
if (URL.substr(0, 4) == "push"){
std::string pusher = URL.substr(7);
cmd2 = "DDV_Buffer 500 "+name+" "+pusher;
Util::Procs::Start(name, cmd2);
}else{
cmd1 = "ffmpeg -re -async 2 -i "+URL+" "+preset+" -f flv -";
cmd2 = "DDV_Buffer 500 "+name;
Util::Procs::Start(name, cmd1, cmd2);
}
}
void CheckAllStreams(Json::Value & data){
for (Json::ValueIterator jit = data.begin(); jit != data.end(); jit++){
if (!Util::Procs::isActive(jit.memberName())){
startStream(jit.memberName(), data[jit.memberName()]);
}
}
}
void CheckStreams(Json::Value & in, Json::Value & out){
if (in.isObject() && (in.size() > 0)){
for (Json::ValueIterator jit = in.begin(); jit != in.end(); jit++){
if (out.isObject() && out.isMember(jit.memberName())){
Log("STRM", std::string("Updated stream ")+jit.memberName());
if (in[jit.memberName()] != out[jit.memberName()]){
Log("STRM", std::string("Updated stream ")+jit.memberName());
Util::Procs::Stop(jit.memberName());
startStream(jit.memberName(), in[jit.memberName()]);
}
}else{
Log("STRM", std::string("New stream ")+jit.memberName());
startStream(jit.memberName(), in[jit.memberName()]);
}
}
if (out.isObject() && (out.size() > 0)){
for (Json::ValueIterator jit = out.begin(); jit != out.end(); jit++){
if (!in.isMember(jit.memberName())){
Log("STRM", std::string("Deleted stream ")+jit.memberName());
}
}
if (out.isObject() && (out.size() > 0)){
for (Json::ValueIterator jit = out.begin(); jit != out.end(); jit++){
if (!in.isMember(jit.memberName())){
Log("STRM", std::string("Deleted stream ")+jit.memberName());
Util::Procs::Stop(jit.memberName());
}
}
}
@ -265,13 +356,24 @@ void CheckStreams(Json::Value & in, Json::Value & out){
}
int main(int argc, char ** argv){
//setup signal handler
struct sigaction new_action;
new_action.sa_handler = signal_handler;
sigemptyset (&new_action.sa_mask);
new_action.sa_flags = 0;
sigaction(SIGINT, &new_action, NULL);
sigaction(SIGHUP, &new_action, NULL);
sigaction(SIGTERM, &new_action, NULL);
sigaction(SIGPIPE, &new_action, NULL);
RSA_Load(); // Load GearBox public key
Util::Config C;
C.confsection = "API";
C.parseArgs(argc, argv);
C.parseFile();
time_t lastuplink = 0;
Socket::Server API_Socket = Socket::Server(C.listen_port, C.interface, true);
time_t processchecker = 0;
API_Socket = Socket::Server(C.listen_port, C.interface, true);
Socket::Server Stats_Socket = Socket::Server("/tmp/ddv_statistics", true);
Util::setUser(C.username);
if (C.daemon_mode){
@ -279,6 +381,7 @@ int main(int argc, char ** argv){
}
Socket::Connection Incoming;
std::vector< ConnectedUser > users;
std::vector<Socket::Connection> buffers;
Json::Value Request = Json::Value(Json::objectValue);
Json::Value Response = Json::Value(Json::objectValue);
Json::Reader JsonParse;
@ -291,6 +394,12 @@ int main(int argc, char ** argv){
while (API_Socket.connected()){
usleep(100000); //sleep for 100 ms - prevents 100% CPU time
if (time(0) - processchecker > 10){
processchecker = time(0);
CheckProtocols(Storage["config"]["protocols"]);
CheckAllStreams(Storage["streams"]);
}
if (time(0) - lastuplink > UPLINK_INTERVAL){
lastuplink = time(0);
bool gotUplink = false;
@ -324,7 +433,7 @@ int main(int argc, char ** argv){
uplink->H.BuildRequest();
uplink->writebuffer += uplink->H.BuildResponse("200", "OK");
uplink->H.Clean();
Log("UPLK", "Sending server data to uplink.");
//Log("UPLK", "Sending server data to uplink.");
}else{
Log("UPLK", "Could not connect to uplink.");
}
@ -332,6 +441,18 @@ int main(int argc, char ** argv){
Incoming = API_Socket.accept();
if (Incoming.connected()){users.push_back(Incoming);}
Incoming = Stats_Socket.accept();
if (Incoming.connected()){buffers.push_back(Incoming);}
if (buffers.size() > 0){
for( std::vector< Socket::Connection >::iterator it = buffers.end() - 1; it >= buffers.begin(); it--) {
if (!it->connected()){
it->close();
buffers.erase(it);
break;
}
}
}
if (users.size() > 0){
for( std::vector< ConnectedUser >::iterator it = users.end() - 1; it >= users.begin(); it--) {
if (!it->C.connected() || it->logins > 3){
@ -349,7 +470,7 @@ int main(int argc, char ** argv){
// They are assumed to be authorized, but authorization to gearbox is still done.
// This authorization uses the compiled-in username and password (account).
if (!JsonParse.parse(it->H.body, Request, false)){
Log("HTTP", "Failed to parse JSON: "+it->H.body);
Log("HTTP", "Failed to parse body JSON: "+it->H.body);
Response["authorize"]["status"] = "INVALID";
}else{
if (Request["authorize"]["status"] != "OK"){
@ -380,13 +501,13 @@ int main(int argc, char ** argv){
Storage["log"].clear();
Storage["statistics"].clear();
}
Log("UPLK", "Received data from uplink.");
WriteFile("config.json", Storage.toStyledString());
//Log("UPLK", "Received data from uplink.");
//WriteFile("config.json", Storage.toStyledString());
}
}
}else{
if (!JsonParse.parse(it->H.GetVar("command"), Request, false)){
Log("HTTP", "Failed to parse JSON: "+it->H.GetVar("command"));
Log("HTTP", "Failed to parse command JSON: "+it->H.GetVar("command"));
Response["authorize"]["status"] = "INVALID";
}else{
std::cout << "Request: " << Request.toStyledString() << std::endl;
@ -428,6 +549,8 @@ int main(int argc, char ** argv){
}
}
}
Util::Procs::StopAll();
WriteFile("config.json", Storage.toStyledString());
std::cout << "Killed all processes, wrote config to disk. Exiting." << std::endl;
return 0;
}

View file

@ -178,9 +178,12 @@ bool HTTP::Parser::parse(){
if (seenHeaders){
if (length > 0){
if (HTTPbuffer.length() >= length){
if ((method != "HTTP/1.0") && (method != "HTTP/1.1")){
body = HTTPbuffer.substr(0, length);
parseVars(body); //parse POST variables
}
body = HTTPbuffer.substr(0, length);
HTTPbuffer.erase(0, length);
parseVars(body); //parse POST variables
return true;
}else{
return false;

View file

@ -29,7 +29,7 @@ Socket::Connection::Connection(){
/// Close connection. The internal socket is closed and then set to -1.
void Socket::Connection::close(){
#if DEBUG >= 4
#if DEBUG >= 6
fprintf(stderr, "Socket closed.\n");
#endif
shutdown(sock, SHUT_RDWR);
@ -81,14 +81,24 @@ Socket::Connection::Connection(std::string address, bool nonblock){
/// \param port String containing the port to connect to.
/// \param nonblock Whether the socket should be nonblocking.
Socket::Connection::Connection(std::string host, int port, bool nonblock){
struct addrinfo *result, *rp;
struct addrinfo *result, *rp, hints;
Error = false;
Blocking = false;
std::stringstream ss;
ss << port;
if (getaddrinfo(host.c_str(), ss.str().c_str(), 0, &result) != 0){
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_ADDRCONFIG;
hints.ai_protocol = 0;
hints.ai_canonname = NULL;
hints.ai_addr = NULL;
hints.ai_next = NULL;
int s = getaddrinfo(host.c_str(), ss.str().c_str(), &hints, &result);
if (s != 0){
#if DEBUG >= 1
fprintf(stderr, "Could not connect to %s:%i! Error: %s\n", host.c_str(), port, strerror(errno));
fprintf(stderr, "Could not connect to %s:%i! Error: %s\n", host.c_str(), port, gai_strerror(s));
#endif
close();
return;
@ -96,10 +106,11 @@ Socket::Connection::Connection(std::string host, int port, bool nonblock){
for (rp = result; rp != NULL; rp = rp->ai_next) {
sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sock == -1){continue;}
if (connect(sock, rp->ai_addr, rp->ai_addrlen) != -1){break;}
if (sock < 0){continue;}
if (connect(sock, rp->ai_addr, rp->ai_addrlen) == 0){break;}
::close(sock);
}
freeaddrinfo(result);
if (rp == 0){
#if DEBUG >= 1
@ -534,18 +545,18 @@ Socket::Connection Socket::Server::accept(bool nonblock){
}else{
if (addrinfo.sin6_family == AF_INET6){
tmp.remotehost = inet_ntop(AF_INET6, &(addrinfo.sin6_addr), addrconv, INET6_ADDRSTRLEN);
#if DEBUG >= 4
#if DEBUG >= 6
fprintf(stderr,"IPv6 addr: %s\n", tmp.remotehost.c_str());
#endif
}
if (addrinfo.sin6_family == AF_INET){
tmp.remotehost = inet_ntop(AF_INET, &(((sockaddr_in*)&addrinfo)->sin_addr), addrconv, INET6_ADDRSTRLEN);
#if DEBUG >= 4
#if DEBUG >= 6
fprintf(stderr,"IPv4 addr: %s\n", tmp.remotehost.c_str());
#endif
}
if (addrinfo.sin6_family == AF_UNIX){
#if DEBUG >= 4
#if DEBUG >= 6
tmp.remotehost = ((sockaddr_un*)&addrinfo)->sun_path;
fprintf(stderr,"Unix socket, no address\n");
#endif
@ -557,7 +568,7 @@ Socket::Connection Socket::Server::accept(bool nonblock){
/// Close connection. The internal socket is closed and then set to -1.
void Socket::Server::close(){
#if DEBUG >= 4
#if DEBUG >= 6
fprintf(stderr, "ServerSocket closed.\n");
#endif
shutdown(sock, SHUT_RDWR);

View file

@ -5,7 +5,12 @@
#include <string.h>
#include <sys/types.h>
#include <signal.h>
#ifdef __FreeBSD__
#include <sys/wait.h>
#else
#include <wait.h>
#endif
#include <errno.h>
#include <iostream>
#include <sys/types.h>
@ -52,6 +57,7 @@ void Util::Procs::childsig_handler(int signum){
#if DEBUG >= 1
if (isActive(pname)){
std::cerr << "Process " << pname << " half-terminated." << std::endl;
Stop(pname);
}else{
std::cerr << "Process " << pname << " fully terminated." << std::endl;
}
@ -190,8 +196,11 @@ pid_t Util::Procs::Start(std::string name, std::string cmd, std::string cmd2){
/// Stops the named process, if running.
/// \arg name (Internal) name of process to stop
void Util::Procs::Stop(std::string name){
int max = 5;
while (isActive(name)){
Stop(getPid(name));
max--;
if (max <= 0){return;}
}
}