From 9104d68a5d3d2a07a75841734cb55f0cf2d19b2e Mon Sep 17 00:00:00 2001 From: Thulinma Date: Wed, 4 Jul 2018 11:36:57 +0200 Subject: [PATCH] Updated generic HTTP output to support websockets. Added basic websocket JSON push support. --- src/output/output_http.cpp | 46 +++++++++++++- src/output/output_http.h | 11 +++- src/output/output_json.cpp | 122 ++++++++++++++++++++++++++++--------- src/output/output_json.h | 12 +++- 4 files changed, 156 insertions(+), 35 deletions(-) diff --git a/src/output/output_http.cpp b/src/output/output_http.cpp index 74e12f1a..b50d8cac 100644 --- a/src/output/output_http.cpp +++ b/src/output/output_http.cpp @@ -6,6 +6,9 @@ namespace Mist { HTTPOutput::HTTPOutput(Socket::Connection & conn) : Output(conn) { + webSock = 0; + idleInterval = 0; + idleLast = 0; if (config->getString("ip").size()){ myConn.setHost(config->getString("ip")); } @@ -14,6 +17,13 @@ namespace Mist { } config->activate(); } + + HTTPOutput::~HTTPOutput() { + if (webSock){ + delete webSock; + webSock = 0; + } + } void HTTPOutput::init(Util::Config * cfg){ Output::init(cfg); @@ -153,6 +163,21 @@ namespace Mist { } void HTTPOutput::requestHandler(){ + if (idleInterval && (Util::bootMS() > idleLast + idleInterval)){ + onIdle(); + idleLast = Util::bootMS(); + } + if (webSock){ + if (webSock->readFrame()){ + onWebsocketFrame(); + idleLast = Util::bootMS(); + }else{ + if (!isBlocking && !parseData){ + Util::sleep(100); + } + } + return; + } if (myConn.Received().size() && myConn.spool()){ DEBUG_MSG(DLVL_DONTEVEN, "onRequest"); onRequest(); @@ -197,12 +222,14 @@ namespace Mist { onRequest(); } if (!myConn.Received().size()){ - Util::sleep(500); + if (!isBlocking && !parseData){ + Util::sleep(100); + } } } }else{ if (!isBlocking && !parseData){ - Util::sleep(500); + Util::sleep(100); } } } @@ -224,8 +251,23 @@ namespace Mist { } INFO_MSG("Received request %s", H.getUrl().c_str()); + //Handle upgrade to websocket if the output supports it + if (doesWebsockets() && H.GetHeader("Upgrade") == "websocket"){ + INFO_MSG("Switching to Websocket mode"); + preWebsocketConnect(); + webSock = new HTTP::Websocket(myConn, H); + if (!(*webSock)){ + delete webSock; + webSock = 0; + return; + } + onWebsocketConnect(); + H.Clean(); + return; + } preHTTP(); onHTTP(); + idleLast = Util::bootMS(); if (!H.bufferChunks){ H.Clean(); } diff --git a/src/output/output_http.h b/src/output/output_http.h index bad8ae6a..9cf37718 100644 --- a/src/output/output_http.h +++ b/src/output/output_http.h @@ -1,6 +1,7 @@ #pragma once #include #include +#include #include "output.h" namespace Mist { @@ -8,18 +9,26 @@ namespace Mist { class HTTPOutput : public Output { public: HTTPOutput(Socket::Connection & conn); - virtual ~HTTPOutput(){}; + virtual ~HTTPOutput(); static void init(Util::Config * cfg); void onRequest(); virtual void onFail(); virtual void onHTTP(){}; + virtual void onIdle(){}; + virtual void onWebsocketFrame(){}; + virtual void onWebsocketConnect(){}; + virtual void preWebsocketConnect(){}; virtual void requestHandler(); virtual void preHTTP(); static bool listenMode(){return false;} + virtual bool doesWebsockets(){return false;} void reConnector(std::string & connector); std::string getHandler(); bool parseRange(uint64_t & byteStart, uint64_t & byteEnd); protected: HTTP::Parser H; + HTTP::Websocket * webSock; + uint32_t idleInterval; + uint64_t idleLast; }; } diff --git a/src/output/output_json.cpp b/src/output/output_json.cpp index b1750ef2..ca7b682c 100644 --- a/src/output/output_json.cpp +++ b/src/output/output_json.cpp @@ -4,18 +4,13 @@ namespace Mist { OutJSON::OutJSON(Socket::Connection & conn) : HTTPOutput(conn){ - ws = 0; realTime = 0; + bootMsOffset = 0; keepReselecting = false; dupcheck = false; + noReceive = false; } - OutJSON::~OutJSON() { - if (ws){ - delete ws; - ws = 0; - } - } - + void OutJSON::init(Util::Config * cfg){ HTTPOutput::init(cfg); capa["name"] = "JSON"; @@ -31,7 +26,7 @@ namespace Mist { capa["methods"][1u]["priority"] = 0ll; capa["methods"][1u]["url_rel"] = "/$.json"; } - + void OutJSON::sendNext(){ JSON::Value jPack = thisPacket.toJSON(); if (dupcheck){ @@ -40,8 +35,8 @@ namespace Mist { } lastVal = jPack; } - if (ws){ - ws->sendFrame(jPack.toString()); + if (webSock){ + webSock->sendFrame(jPack.toString()); return; } if (!jsonp.size()){ @@ -83,7 +78,7 @@ namespace Mist { static bool recursive = false; if (recursive){return true;} recursive = true; - if (keepReselecting){ + if (keepReselecting && !isPushing()){ uint64_t maxTimer = 7200; while (--maxTimer && nProxy.userClient.isAlive() && keepGoing()){ Util::wait(500); @@ -99,18 +94,23 @@ namespace Mist { } } recursive = false; - return false; } if (!jsonp.size() && !first){ - myConn.SendNow("]);\n\n", 5); + myConn.SendNow("]\n", 2); } myConn.close(); return false; } - void OutJSON::onHTTP(){ - std::string method = H.method; - jsonp = ""; + void OutJSON::onWebsocketConnect(){ + sentHeader = true; + parseData = !noReceive; + } + + void OutJSON::preWebsocketConnect(){ + if (H.GetVar("password") != ""){pushPass = H.GetVar("password");} + if (H.GetVar("password").size() || H.GetVar("push").size()){noReceive = true;} + if (H.GetVar("persist") != ""){keepReselecting = true;} if (H.GetVar("dedupe") != ""){ dupcheck = true; @@ -126,22 +126,84 @@ namespace Mist { } } } + } + + void OutJSON::onWebsocketFrame(){ + if (!isPushing()){ + if (!allowPush(pushPass)){ + onFinish(); + return; + } + } + if (!bootMsOffset){ + if (myMeta.bootMsOffset){ + bootMsOffset = myMeta.bootMsOffset; + }else{ + bootMsOffset = Util::bootMS(); + } + } + //We now know we're allowed to push. Read a JSON object. + JSON::Value inJSON = JSON::fromString(webSock->data, webSock->data.size()); + if (!inJSON || !inJSON.isObject()){ + //Ignore empty and/or non-parsable JSON packets + MEDIUM_MSG("Ignoring non-JSON object: %s", webSock->data); + return; + } + //Let's create a new track for pushing purposes, if needed + if (!pushTrack){ + pushTrack = 1; + while (myMeta.tracks.count(pushTrack)){ + ++pushTrack; + } + } + myMeta.tracks[pushTrack].type = "meta"; + myMeta.tracks[pushTrack].codec = "JSON"; + //We have a track set correctly. Let's attempt to buffer a frame. + inJSON["trackid"] = (long long)pushTrack; + inJSON["datatype"] = "meta"; + lastSendTime = Util::bootMS(); + if (!inJSON.isMember("unix")){ + //Base timestamp on arrival time + inJSON["time"] = (long long)(lastSendTime - bootMsOffset); + }else{ + //Base timestamp on unix time + inJSON["time"] = (long long)((lastSendTime - bootMsOffset) + (Util::epoch() - Util::bootSecs()) * 1000); + } + inJSON["bmo"] = (long long)bootMsOffset; + lastVal = inJSON; + std::string packedJson = inJSON.toNetPacked(); + DTSC::Packet newPack(packedJson.data(), packedJson.size(), true); + bufferLivePacket(newPack); + if (!idleInterval){idleInterval = 100;} + if (isBlocking){setBlocking(false);} + } + + /// Repeats last JSON packet every 5 seconds to keep stream alive. + void OutJSON::onIdle(){ + if (nProxy.trackState[pushTrack] != FILL_ACC){ + continueNegotiate(pushTrack); + if (nProxy.trackState[pushTrack] == FILL_ACC){ + idleInterval = 5000; + } + return; + } + lastVal["time"] = (long long)(lastVal["time"].asInt() + (Util::bootMS() - lastSendTime)); + lastSendTime = Util::bootMS(); + lastVal.netPrepare(); + std::string packedJson = lastVal.toNetPacked(); + DTSC::Packet newPack(packedJson.data(), packedJson.size(), true); + myMeta.tracks[pushTrack].type = "meta"; + myMeta.tracks[pushTrack].codec = "JSON"; + bufferLivePacket(newPack); + } + + void OutJSON::onHTTP(){ + std::string method = H.method; + preWebsocketConnect();//Not actually a websocket, but we need to do the same checks + jsonp = ""; if (H.GetVar("callback") != ""){jsonp = H.GetVar("callback");} if (H.GetVar("jsonp") != ""){jsonp = H.GetVar("jsonp");} - if (H.GetHeader("Upgrade") == "websocket"){ - ws = new HTTP::Websocket(myConn, H); - if (!(*ws)){ - delete ws; - ws = 0; - return; - } - sentHeader = true; - parseData = true; - wantRequest = false; - return; - } - H.Clean(); H.setCORSHeaders(); if(method == "OPTIONS" || method == "HEAD"){ diff --git a/src/output/output_json.h b/src/output/output_json.h index 7959b4a2..3a290fc6 100644 --- a/src/output/output_json.h +++ b/src/output/output_json.h @@ -5,21 +5,29 @@ namespace Mist { class OutJSON : public HTTPOutput { public: OutJSON(Socket::Connection & conn); - ~OutJSON(); static void init(Util::Config * cfg); void onHTTP(); + void onIdle(); + virtual void onWebsocketFrame(); + virtual void onWebsocketConnect(); + virtual void preWebsocketConnect(); bool onFinish(); void onFail(); void sendNext(); void sendHeader(); + bool doesWebsockets(){return true;} protected: JSON::Value lastVal; + uint64_t lastSendTime; bool keepReselecting; std::string jsonp; + std::string pushPass; + uint64_t pushTrack; + int64_t bootMsOffset; bool dupcheck; std::set nodup; bool first; - HTTP::Websocket * ws; + bool noReceive; }; }