From 2b18a414b4150165965d56f9adbcff58b678e0be Mon Sep 17 00:00:00 2001 From: Marco van Dijk Date: Wed, 14 Sep 2022 15:01:51 +0200 Subject: [PATCH] Added support for external writers --- CMakeLists.txt | 5 + lib/defines.h | 4 + lib/dtsc.cpp | 164 +++----------- lib/dtsc.h | 2 +- lib/util.cpp | 210 ++++++++++++++++++ lib/util.h | 4 + src/controller/controller.cpp | 4 + src/controller/controller_api.cpp | 8 + .../controller_external_writers.cpp | 173 +++++++++++++++ src/controller/controller_external_writers.h | 14 ++ src/controller/meson.build | 1 + src/input/input.cpp | 18 +- src/output/output.cpp | 26 +-- test/converter.cpp | 30 +++ test/meson.build | 1 + 15 files changed, 510 insertions(+), 154 deletions(-) create mode 100644 src/controller/controller_external_writers.cpp create mode 100644 src/controller/controller_external_writers.h create mode 100644 test/converter.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index c42300f3..42f66781 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -845,6 +845,7 @@ add_custom_command(OUTPUT generated/server.html.h # MistController - Build # ######################################## add_executable(MistController + src/controller/controller_external_writers.h src/controller/controller_limits.h src/controller/controller_uplink.h src/controller/controller_api.h @@ -857,6 +858,7 @@ add_executable(MistController src/controller/controller_push.h src/controller/controller_variables.h src/controller/controller.cpp + src/controller/controller_external_writers.cpp src/controller/controller_updater.cpp src/controller/controller_streams.cpp src/controller/controller_storage.cpp @@ -906,6 +908,9 @@ add_test(URLTest COMMAND urltest) add_executable(logtest test/log.cpp ${BINARY_DIR}/mist/.headers) target_link_libraries(logtest mist) add_test(LOGTest COMMAND logtest) +add_executable(logconvertertest test/converter.cpp ${BINARY_DIR}/mist/.headers) +target_link_libraries(logconvertertest mist) +add_test(LOGConverterTest COMMAND logconvertertest) add_executable(downloadertest test/downloader.cpp ${BINARY_DIR}/mist/.headers) target_link_libraries(downloadertest mist) add_test(DownloaderTest COMMAND downloadertest) diff --git a/lib/defines.h b/lib/defines.h index 9f04f8e4..92c0c5d0 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -206,6 +206,10 @@ static inline void show_stackframe(){} #define CUSTOM_VARIABLES_INITSIZE 64 * 1024 +#define EXTWRITERS "MstExtWriters" + +#define EXTWRITERS_INITSIZE 1 * 1024 * 1024 + #define SEM_STATISTICS "/MstStat" #define SEM_USERS "/MstUser%s" //%s stream name diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index 7f07ccc4..c40ae698 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -950,12 +950,8 @@ namespace DTSC{ inFile.read(scanBuf, fileSize); inFile.close(); - - size_t offset = 8; - if (!memcmp(scanBuf, "DTP2", 4)){offset = 20;} - - DTSC::Scan src(scanBuf + offset, fileSize - offset); - reInit(_streamName, src); + DTSC::Packet pkt(scanBuf, fileSize, true); + reInit(_streamName, pkt.getScan()); free(scanBuf); } @@ -2693,134 +2689,6 @@ namespace DTSC{ return result; } - /// Writes the current Meta object in DTSH format to the given filename - void Meta::toFile(const std::string &fName) const{ - std::string lVars; - size_t lVarSize = 0; - if (inputLocalVars.size()){ - lVars = inputLocalVars.toString(); - lVarSize = 2 + 14 + 5 + lVars.size(); - } - - std::ofstream oFile(fName.c_str(), std::ios::binary | std::ios::ate); - oFile.write(DTSC::Magic_Header, 4); - oFile.write(c32(lVarSize + getSendLen() - 8), 4); - oFile.write("\340", 1); - if (getVod()){oFile.write("\000\003vod\001\000\000\000\000\000\000\000\001", 14);} - if (getLive()){oFile.write("\000\004live\001\000\000\000\000\000\000\000\001", 15);} - oFile.write("\000\007version\001", 10); - oFile.write(c64(DTSH_VERSION), 8); - if (lVarSize){ - oFile.write("\000\016inputLocalVars\002", 17); - oFile.write(c32(lVars.size()), 4); - oFile.write(lVars.data(), lVars.size()); - } - oFile.write("\000\006tracks\340", 9); - for (std::map::const_iterator it = tracks.begin(); it != tracks.end(); it++){ - if (!it->second.parts.getPresent()){continue;} - std::string tmp = getTrackIdentifier(it->first, true); - oFile.write(c16(tmp.size()), 2); - oFile.write(tmp.data(), tmp.size()); - oFile.write("\340", 1); // Begin track object - - size_t fragCount = it->second.fragments.getPresent(); - oFile.write("\000\011fragments\002", 12); - oFile.write(c32(fragCount * DTSH_FRAGMENT_SIZE), 4); - for (size_t i = 0; i < fragCount; i++){ - oFile.write(c32(it->second.fragments.getInt("duration", i)), 4); - oFile.put(it->second.fragments.getInt("keys", i)); - oFile.write(c32(it->second.fragments.getInt("firstkey", i) + 1), 4); - oFile.write(c32(it->second.fragments.getInt("size", i)), 4); - } - - size_t keyCount = it->second.keys.getPresent(); - oFile.write("\000\004keys\002", 7); - oFile.write(c32(keyCount * DTSH_KEY_SIZE), 4); - for (size_t i = 0; i < keyCount; i++){ - oFile.write(c64(it->second.keys.getInt("bpos", i)), 8); - oFile.write(c24(it->second.keys.getInt("duration", i)), 3); - oFile.write(c32(it->second.keys.getInt("number", i) + 1), 4); - oFile.write(c16(it->second.keys.getInt("parts", i)), 2); - oFile.write(c64(it->second.keys.getInt("time", i)), 8); - } - oFile.write("\000\010keysizes\002,", 11); - oFile.write(c32(keyCount * 4), 4); - for (size_t i = 0; i < keyCount; i++){ - oFile.write(c32(it->second.keys.getInt("size", i)), 4); - } - - size_t partCount = it->second.parts.getPresent(); - oFile.write("\000\005parts\002", 8); - oFile.write(c32(partCount * DTSH_PART_SIZE), 4); - for (size_t i = 0; i < partCount; i++){ - oFile.write(c24(it->second.parts.getInt("size", i)), 3); - oFile.write(c24(it->second.parts.getInt("duration", i)), 3); - oFile.write(c24(it->second.parts.getInt("offset", i)), 3); - } - - oFile.write("\000\007trackid\001", 10); - oFile.write(c64(it->second.track.getInt("id")), 8); - - if (it->second.track.getInt("missedFrags")){ - oFile.write("\000\014missed_frags\001", 15); - oFile.write(c64(it->second.track.getInt("missedFrags")), 8); - } - - oFile.write("\000\007firstms\001", 10); - oFile.write(c64(it->second.track.getInt("firstms")), 8); - oFile.write("\000\006lastms\001", 9); - oFile.write(c64(it->second.track.getInt("lastms")), 8); - - oFile.write("\000\003bps\001", 6); - oFile.write(c64(it->second.track.getInt("bps")), 8); - - oFile.write("\000\006maxbps\001", 9); - oFile.write(c64(it->second.track.getInt("maxbps")), 8); - - tmp = getInit(it->first); - oFile.write("\000\004init\002", 7); - oFile.write(c32(tmp.size()), 4); - oFile.write(tmp.data(), tmp.size()); - - tmp = getCodec(it->first); - oFile.write("\000\005codec\002", 8); - oFile.write(c32(tmp.size()), 4); - oFile.write(tmp.data(), tmp.size()); - - tmp = getLang(it->first); - if (tmp.size() && tmp != "und"){ - oFile.write("\000\004lang\002", 7); - oFile.write(c32(tmp.size()), 4); - oFile.write(tmp.data(), tmp.size()); - } - - tmp = getType(it->first); - oFile.write("\000\004type\002", 7); - oFile.write(c32(tmp.size()), 4); - oFile.write(tmp.data(), tmp.size()); - - if (tmp == "audio"){ - oFile.write("\000\004rate\001", 7); - oFile.write(c64(it->second.track.getInt("rate")), 8); - oFile.write("\000\004size\001", 7); - oFile.write(c64(it->second.track.getInt("size")), 8); - oFile.write("\000\010channels\001", 11); - oFile.write(c64(it->second.track.getInt("channels")), 8); - }else if (tmp == "video"){ - oFile.write("\000\005width\001", 8); - oFile.write(c64(it->second.track.getInt("width")), 8); - oFile.write("\000\006height\001", 9); - oFile.write(c64(it->second.track.getInt("height")), 8); - oFile.write("\000\004fpks\001", 7); - oFile.write(c64(it->second.track.getInt("fpks")), 8); - } - oFile.write("\000\000\356", 3); // End this track Object - } - oFile.write("\000\000\356", 3); // End tracks object - oFile.write("\000\000\356", 3); // End global object - oFile.close(); - } - /// Converts the current Meta object to JSON format void Meta::toJSON(JSON::Value &res, bool skipDynamic, bool tracksOnly) const{ res.null(); @@ -2885,10 +2753,29 @@ namespace DTSC{ if (getSource() != ""){res["source"] = getSource();} } + /// Writes the current Meta object in DTSH format to the given uri + void Meta::toFile(const std::string &uri) const{ + // Create writing socket + int outFd = -1; + if (!Util::externalWriter(uri, outFd, false)){return;} + Socket::Connection outFile(outFd, -1); + if (outFile){ + send(outFile, false, getValidTracks(), false); + outFile.close(); + } + } + /// Sends the current Meta object through a socket in DTSH format void Meta::send(Socket::Connection &conn, bool skipDynamic, std::set selectedTracks, bool reID) const{ + std::string lVars; + size_t lVarSize = 0; + if (inputLocalVars.size()){ + lVars = inputLocalVars.toString(); + lVarSize = 2 + 14 + 5 + lVars.size(); + } + conn.SendNow(DTSC::Magic_Header, 4); - conn.SendNow(c32(getSendLen(skipDynamic, selectedTracks) - 8), 4); + conn.SendNow(c32(lVarSize + getSendLen(skipDynamic, selectedTracks) - 8), 4); conn.SendNow("\340", 1); if (getVod()){conn.SendNow("\000\003vod\001\000\000\000\000\000\000\000\001", 14);} if (getLive()){conn.SendNow("\000\004live\001\000\000\000\000\000\000\000\001", 15);} @@ -2898,6 +2785,11 @@ namespace DTSC{ conn.SendNow("\000\010unixzero\001", 11); conn.SendNow(c64(Util::unixMS() - Util::bootMS() + getBootMsOffset()), 8); } + if (lVarSize){ + conn.SendNow("\000\016inputLocalVars\002", 17); + conn.SendNow(c32(lVars.size()), 4); + conn.SendNow(lVars.data(), lVars.size()); + } conn.SendNow("\000\006tracks\340", 9); for (std::set::const_iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ std::string tmp = getTrackIdentifier(*it, true); @@ -2923,7 +2815,7 @@ namespace DTSC{ conn.SendNow(c32(fragments.getInt("duration", i + fragBegin)), 4); conn.SendNow(std::string(1, (char)fragments.getInt("keys", i + fragBegin))); - conn.SendNow(c32(fragments.getInt("firstkey", i + fragBegin)), 4); + conn.SendNow(c32(fragments.getInt("firstkey", i + fragBegin) + 1), 4); conn.SendNow(c32(fragments.getInt("size", i + fragBegin)), 4); } diff --git a/lib/dtsc.h b/lib/dtsc.h index efaa0801..3197193a 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -465,7 +465,7 @@ namespace DTSC{ void remap(const std::string &_streamName = ""); uint64_t getSendLen(bool skipDynamic = false, std::set selectedTracks = std::set()) const; - void toFile(const std::string &fName) const; + void toFile(const std::string &uri) const; void send(Socket::Connection &conn, bool skypDynamic = false, std::set selectedTracks = std::set(), bool reID = false) const; void toJSON(JSON::Value &res, bool skipDynamic = true, bool tracksOnly = false) const; diff --git a/lib/util.cpp b/lib/util.cpp index 60b0bd65..0c8b932f 100644 --- a/lib/util.cpp +++ b/lib/util.cpp @@ -7,6 +7,7 @@ #include "procs.h" #include "timing.h" #include "util.h" +#include "url.h" #include // errno, ENOENT, EEXIST #include #include @@ -186,7 +187,104 @@ namespace Util{ } val = val.substr(startPos, length); } + + /// \brief splits a string on commas and returns a list of substrings + void splitString(std::string &src, char delim, std::deque &result){ + result.clear(); + std::deque positions; + uint64_t pos = src.find(delim, 0); + while (pos != std::string::npos){ + positions.push_back(pos); + pos = src.find(delim, pos + 1); + } + if (positions.size() == 0){ + result.push_back(src); + return; + } + uint64_t prevPos = 0; + for (int i = 0; i < positions.size(); i++) { + if (!prevPos){result.push_back(src.substr(prevPos, positions[i]));} + else{result.push_back(src.substr(prevPos + 1, positions[i] - prevPos - 1));} + prevPos = positions[i]; + } + if (prevPos < src.size()){result.push_back(src.substr(prevPos + 1));} + } + /// \brief Connects the given file descriptor to a file or uploader binary + /// \param uri target URL or filepath + /// \param outFile file descriptor which will be used to send data + /// \param append whether to open this connection in truncate or append mode + bool externalWriter(const std::string & uri, int &outFile, bool append){ + HTTP::URL target(uri); + // If it is a remote target, we might need to spawn an external binary + if (!target.isLocalPath()){ + bool matchedProtocol = false; + // Read configured external writers + IPC::sharedPage extwriPage(EXTWRITERS, 0, false, false); + if (extwriPage.mapped){ + Util::RelAccX extWri(extwriPage.mapped, false); + if (extWri.isReady()){ + for (uint64_t i = 0; i < extWri.getEndPos(); i++){ + // Retrieve binary config + std::string name = extWri.getPointer("name", i); + std::string cmdline = extWri.getPointer("cmdline", i); + Util::RelAccX protocols = Util::RelAccX(extWri.getPointer("protocols", i)); + uint8_t protocolCount = protocols.getPresent(); + JSON::Value protocolArray; + for (uint8_t idx = 0; idx < protocolCount; idx++){ + protocolArray.append(protocols.getPointer("protocol", idx)); + } + jsonForEach(protocolArray, protocol){ + if (target.protocol != (*protocol).asStringRef()){ continue; } + HIGH_MSG("Using %s in order connect to URL with protocol %s", name.c_str(), target.protocol.c_str()); + matchedProtocol = true; + // Split configured parameters for this writer on whitespace + // TODO: we might want to trim whitespaces and remove empty parameters + std::deque parameterList; + Util::splitString(cmdline, ' ', parameterList); + // Build the startup command, which needs space for the program name, each parameter, the target url and a null at the end + char **cmd = (char**)malloc(sizeof(char*) * (parameterList.size() + 2)); + size_t curArg = 0; + // Write each parameter + for (size_t j = 0; j < parameterList.size(); j++) { + cmd[curArg++] = (char*)parameterList[j].c_str(); + } + // Write the target URL as the last positional argument then close with a null at the end + cmd[curArg++] = (char*)uri.c_str(); + cmd[curArg++] = 0; + + pid_t child = startConverted(cmd, outFile); + if (child == -1){ + ERROR_MSG("'%s' process did not start, aborting", cmd[0]); + return false; + } + Util::Procs::forget(child); + free(cmd); + break; + } + if (matchedProtocol){ break; } + } + } + } + if (!matchedProtocol){ + ERROR_MSG("Could not connect to '%s', since we do not have a configured external writer to handle '%s' protocols", uri.c_str(), target.protocol.c_str()); + return false; + } + }else{ + int flags = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH; + int mode = O_RDWR | O_CREAT | (append ? O_APPEND : O_TRUNC); + if (!Util::createPathFor(uri)){ + ERROR_MSG("Cannot not create file %s: could not create parent folder", uri.c_str()); + return false; + } + outFile = open(uri.c_str(), mode, flags); + if (outFile < 0){ + ERROR_MSG("Failed to open file %s, error: %s", uri.c_str(), strerror(errno)); + return false; + } + } + return true; + } //Returns the time to wait in milliseconds for exponential back-off waiting. //If currIter > maxIter, always returns 5ms to prevent tight eternal loops when mistakes are made //Otherwise, exponentially increases wait time for a total of maxWait milliseconds after maxIter calls. @@ -337,6 +435,118 @@ namespace Util{ } } + /// \brief Forks to a log converter, which spawns an external writer and pretty prints it stdout and stderr + pid_t startConverted(const char *const *argv, int &outFile){ + int p[2]; + if (pipe(p) == -1){ + ERROR_MSG("Unable to create pipe in order to connect to the STDIN of the target binary"); + return -1; + } + Util::Procs::fork_prepare(); + pid_t converterPid = fork(); + // Child process + if (converterPid == 0){ + Util::Procs::fork_complete(); + close(p[1]); + // Override signals + struct sigaction new_action; + new_action.sa_handler = SIG_IGN; + sigemptyset(&new_action.sa_mask); + new_action.sa_flags = 0; + sigaction(SIGINT, &new_action, NULL); + sigaction(SIGHUP, &new_action, NULL); + sigaction(SIGTERM, &new_action, NULL); + sigaction(SIGPIPE, &new_action, NULL); + // Start external writer + int fdOut = -1; + int fdErr = -1; + pid_t binPid = Util::Procs::StartPiped(argv, &p[0], &fdOut, &fdErr); + close(p[0]); + if (binPid == -1){ + FAIL_MSG("Failed to start binary `%s`", argv[0]); + } + // Close all sockets in the socketList + for (std::set::iterator it = Util::Procs::socketList.begin(); + it != Util::Procs::socketList.end(); ++it){ + close(*it); + } + // Okay, so.... hear me out here. + // This code normalizes the stdin and stdout file descriptors, so that + // they are connected to the pipes we're reading from the child process. + // This is not technically needed, but it makes it easier to debug pipe- + // related problems, and works around a nasty issue were left-over stdout + // connected to another converted process may keep that other process alive + // when it really shouldn't. + // This isn't pretty, it's not fully correct, but it's good enough for now. + // If somebody writes a prettier version of this, please do sent a pull request. + // Thanks <3 + std::set toClose; + while (fdErr == 0 || fdErr == 1){ + int tmp = dup(fdErr); + if (tmp > -1){ + toClose.insert(fdErr); + fdErr = tmp; + } + } + while (fdOut == 0 || fdOut == 1){ + int tmp = dup(fdOut); + if (tmp > -1){ + toClose.insert(fdOut); + fdOut = tmp; + } + } + while (toClose.size()){ + close(*toClose.begin()); + toClose.erase(toClose.begin()); + } + dup2(fdErr, 1); + dup2(fdOut, 0); + close(fdErr); + close(fdOut); + // Read pipes and write to the stdErr of parent process + Util::logConverter(1, 0, 2, argv[0], binPid); + exit(0); + } + if (converterPid == -1){ + FAIL_MSG("Failed to fork log converter for log handling!"); + close(p[1]); + }else{ + Util::Procs::remember(converterPid); + outFile = p[1]; + } + close(p[0]); + Util::Procs::fork_complete(); + return converterPid; + } + + void logConverter(int inErr, int inOut, int out, const char *progName, pid_t pid){ + Socket::Connection errStream(-1, inErr); + Socket::Connection outStream(-1, inOut); + errStream.setBlocking(false); + outStream.setBlocking(false); + while (errStream || outStream){ + if (errStream.spool() || errStream.Received().size()){ + while (errStream.Received().size()){ + std::string &line = errStream.Received().get(); + while (line.find('\r') != std::string::npos){line.erase(line.find('\r'));} + while (line.find('\n') != std::string::npos){line.erase(line.find('\n'));} + dprintf(out, "INFO|%s|%d||%s|%s\n", progName, pid, Util::streamName, line.c_str()); + line.clear(); + } + }else if (outStream.spool() || outStream.Received().size()){ + while (outStream.Received().size()){ + std::string &line = outStream.Received().get(); + while (line.find('\r') != std::string::npos){line.erase(line.find('\r'));} + while (line.find('\n') != std::string::npos){line.erase(line.find('\n'));} + dprintf(out, "INFO|%s|%d||%s|%s\n", progName, pid, Util::streamName, line.c_str()); + line.clear(); + } + }else{Util::sleep(25);} + } + errStream.close(); + outStream.close(); + } + /// Parses log messages from the given file descriptor in, printing them to out, optionally /// calling the given callback for each valid message. Closes the file descriptor on read error void logParser(int in, int out, bool colored, diff --git a/lib/util.h b/lib/util.h index 8fc39f13..e9f28ef7 100644 --- a/lib/util.h +++ b/lib/util.h @@ -15,6 +15,8 @@ namespace Util{ void stringToLower(std::string &val); size_t replace(std::string &str, const std::string &from, const std::string &to); void stringTrim(std::string &val); + void splitString(std::string &val, char delim, std::deque &result); + bool externalWriter(const std::string & file, int &outFile, bool append = false); int64_t expBackoffMs(const size_t currIter, const size_t maxIter, const int64_t maxWait); @@ -64,6 +66,8 @@ namespace Util{ void logParser(int in, int out, bool colored, void callback(const std::string &, const std::string &, const std::string &, uint64_t, bool) = 0); void redirectLogsIfNeeded(); + pid_t startConverted(const char *const *argv, int &outFile); + void logConverter(int inErr, int inOut, int out, const char *progName, pid_t pid); /// Holds type, size and offset for RelAccX class internal data fields. class RelAccXFieldData{ diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index f8d69a4e..d8aaf053 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -2,6 +2,7 @@ /// Contains all code for the controller executable. #include "controller_api.h" +#include "controller_external_writers.h" #include "controller_capabilities.h" #include "controller_connectors.h" #include "controller_push.h" @@ -586,6 +587,9 @@ int main_loop(int argc, char **argv){ #endif /*LTS-END*/ + // Init external writer config + Controller::externalWritersToShm(); + // start main loop while (Controller::conf.is_active){ Controller::conf.serveThreadedSocket(Controller::handleAPIConnection); diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index e0b1d000..f5fd3494 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -4,6 +4,7 @@ #include "controller_statistics.h" #include "controller_storage.h" #include "controller_streams.h" +#include "controller_external_writers.h" #include //for browse API call #include #include @@ -1203,6 +1204,13 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){ if (Request.isMember("variable_add")){Controller::addVariable(Request["variable_add"], Response["variable_list"]);} if (Request.isMember("variable_remove")){Controller::removeVariable(Request["variable_remove"], Response["variable_list"]);} + if (Request.isMember("external_writer_remove")){Controller::removeExternalWriter(Request["external_writer_remove"]);} + if (Request.isMember("external_writer_add")){Controller::addExternalWriter(Request["external_writer_add"]);} + if (Request.isMember("external_writer_remove") || Request.isMember("external_writer_add") || + Request.isMember("external_writer_list")){ + Controller::listExternalWriters(Response["external_writer_list"]); + } + Controller::writeConfig(); Controller::configChanged = false; diff --git a/src/controller/controller_external_writers.cpp b/src/controller/controller_external_writers.cpp new file mode 100644 index 00000000..b09eef0f --- /dev/null +++ b/src/controller/controller_external_writers.cpp @@ -0,0 +1,173 @@ +#include "controller_external_writers.h" +#include "controller_statistics.h" +#include "controller_storage.h" +#include +#include +#include +#include +#include +#include +#include + +namespace Controller{ + // Size of the shared memory page + static uint64_t pageSize = EXTWRITERS_INITSIZE; + + /// \brief Writes external writers from the server config to shared memory + void externalWritersToShm(){ + uint64_t writerCount = Controller::Storage["extwriters"].size(); + IPC::sharedPage writersPage(EXTWRITERS, pageSize, false, false); + // If we have an existing page, set the reload flag + if (writersPage.mapped){ + writersPage.master = true; + Util::RelAccX binAccx = Util::RelAccX(writersPage.mapped, false); + // Check if we need a bigger page + uint64_t sizeRequired = binAccx.getOffset() + binAccx.getRSize() * writerCount; + if (pageSize < sizeRequired){pageSize = sizeRequired;} + binAccx.setReload(); + } + // Close & unlink any existing page and create a new one + writersPage.close(); + writersPage.init(EXTWRITERS, pageSize, true, false); + Util::RelAccX exwriAccx = Util::RelAccX(writersPage.mapped, false); + exwriAccx = Util::RelAccX(writersPage.mapped, false); + exwriAccx.addField("name", RAX_32STRING); + exwriAccx.addField("cmdline", RAX_256STRING); + exwriAccx.addField("protocols", RAX_NESTED, RAX_64STRING * writerCount * 8); + // Set amount of records that can fit and how many will be used + uint64_t reqCount = (pageSize - exwriAccx.getOffset()) / exwriAccx.getRSize(); + exwriAccx.setRCount(reqCount); + exwriAccx.setPresent(reqCount); + exwriAccx.setEndPos(writerCount); + // Do the same for the nested protocol field + uint64_t index = 0; + jsonForEach(Controller::Storage["extwriters"], it){ + std::string name = (*it)[0u].asString(); + std::string cmdline = (*it)[1u].asString(); + exwriAccx.setString("name", name, index); + exwriAccx.setString("cmdline", cmdline, index); + // Create nested field for source match + uint8_t protocolCount = (*it)[2u].size(); + Util::RelAccX protocolAccx = Util::RelAccX(exwriAccx.getPointer("protocols", index), false); + protocolAccx.addField("protocol", RAX_64STRING); + protocolAccx.setRCount(protocolCount); + protocolAccx.setPresent(protocolCount); + protocolAccx.setEndPos(protocolCount); + uint8_t binIt = 0; + jsonForEach((*it)[2u], protIt){ + std::string thisProtocol = (*protIt).asString(); + protocolAccx.setString("protocol", thisProtocol, binIt); + binIt++; + } + index++; + protocolAccx.setReady(); + } + exwriAccx.setReady(); + // Leave the page in memory after returning + writersPage.master = false; + } + + /// \brief Adds a new generic writer binary to the server config + /// The request should contain: + /// - name: name given to this binary in order to edit/remove it's entry in Mists config + /// - cmdline: command line including arguments + /// - supported URL protocols: used to identify for what targets we need to run the executable + void addExternalWriter(JSON::Value &request){ + std::string name; + std::string cmdline; + JSON::Value protocols; + bool isNew = true; + if (request.isArray()){ + if(request.size() == 4){ + name = request[0u].asString(); + cmdline = request[1u].asString(); + protocols = request[2u]; + }else{ + ERROR_MSG("Cannot add external writer, as the request contained %u variables. Required variables are: name, cmdline and protocols", request.size()); + return; + } + }else{ + name = request["name"].asString(); + cmdline = request["cmdline"].asString(); + protocols = request["protocols"]; + } + //convert protocols from string to array if needed + if (protocols.isString()){protocols.append(protocols.asString());} + if (!name.size()){ + ERROR_MSG("Blank or missing name in request"); + return; + } + if (!cmdline.size()){ + ERROR_MSG("Blank or missing cmdline in request"); + return; + } + if (!protocols.size()){ + ERROR_MSG("Missing protocols in request"); + return; + } + if (name.size() > 31){ + name = name.substr(0, 31); + WARN_MSG("Maximum name length is 31 characters, truncating name to '%s'", name.c_str()); + } + if (cmdline.size() > 255){ + cmdline.erase(255); + WARN_MSG("Maximum cmdline length is 255 characters, truncating cmdline to '%s'", cmdline.c_str()); + } + jsonForEach(protocols, protIt){ + if ((*protIt).size() > 63){ + (*protIt) = (*protIt).asString().substr(0, 63); + WARN_MSG("Maximum protocol length is 63 characters, truncating protocol to '%s'", (*protIt).asStringRef().c_str()); + } + } + + // Check if we have an existing variable with the same name to modify + jsonForEach(Controller::Storage["extwriters"], it){ + if ((*it)[0u].asString() == name){ + INFO_MSG("Modifying existing external writer '%s'", name.c_str()); + (*it)[1u] = cmdline; + (*it)[2u] = protocols; + isNew = false; + break; + } + } + // Else push a new custom variable to the list + if (isNew){ + INFO_MSG("Adding new external writer '%s'", name.c_str()); + JSON::Value thisVar; + thisVar.append(name); + thisVar.append(cmdline); + thisVar.append(protocols); + Controller::Storage["extwriters"].append(thisVar); + } + // Modify shm + externalWritersToShm(); + } + + /// \brief Fills output with all defined external writers + void listExternalWriters(JSON::Value &output){ + output = Controller::Storage["extwriters"]; + } + + /// \brief Removes the external writer name contained in the request from shm and the sever config + void removeExternalWriter(const JSON::Value &request){ + std::string name; + if (request.isString()){ + name = request.asStringRef(); + }else if (request.isArray()){ + name = request[0u].asStringRef(); + }else if (request.isMember("name")){ + name = request["name"].asStringRef(); + } + if (!name.size()){ + WARN_MSG("Aborting request to remove an external writer, as no name was given"); + return; + } + // Modify config + jsonForEach(Controller::Storage["extwriters"], it){ + if ((*it)[0u].asString() == name){it.remove();} + } + // Modify shm + externalWritersToShm(); + } +}// namespace Controller + diff --git a/src/controller/controller_external_writers.h b/src/controller/controller_external_writers.h new file mode 100644 index 00000000..e2ebc2b0 --- /dev/null +++ b/src/controller/controller_external_writers.h @@ -0,0 +1,14 @@ +#include +#include +#include +#include + +namespace Controller{ + // API calls to manage external writers + void addExternalWriter(JSON::Value &request); + void listExternalWriters(JSON::Value &output); + void removeExternalWriter(const JSON::Value &request); + + // internal use only + void externalWritersToShm(); +}// namespace Controller diff --git a/src/controller/meson.build b/src/controller/meson.build index 0b91f005..28142309 100644 --- a/src/controller/meson.build +++ b/src/controller/meson.build @@ -3,6 +3,7 @@ executables += { 'name': 'MistController', 'sources' : [ files( 'controller.cpp', + 'controller_external_writers.cpp', 'controller_updater.cpp', 'controller_streams.cpp', 'controller_storage.cpp', diff --git a/src/input/input.cpp b/src/input/input.cpp index 9d1e66fd..db82cc3f 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -8,11 +8,11 @@ #include #include #include -#include #include #include #include #include +#include #include #include @@ -587,6 +587,7 @@ namespace Mist{ } // close file file.close(); + // Export DTSH to a file or remote target outMeta.toFile(fileName + ".dtsh"); } @@ -1496,7 +1497,20 @@ namespace Mist{ return true; } } - meta.reInit(config->getString("streamname"), config->getString("input") + ".dtsh"); + // Try to read any existing DTSH file + std::string fileName = config->getString("input") + ".dtsh"; + HIGH_MSG("Loading metadata for stream '%s' from file '%s'", streamName.c_str(), fileName.c_str()); + char *scanBuf; + uint64_t fileSize; + HTTP::URIReader inFile(fileName); + if (!inFile){return false;} + inFile.readAll(scanBuf, fileSize); + inFile.close(); + if (!fileSize){return false;} + DTSC::Packet pkt(scanBuf, fileSize, true); + HIGH_MSG("Retrieved header of %lu bytes", fileSize); + meta.reInit(streamName, pkt.getScan()); + if (meta.version != DTSH_VERSION){ INFO_MSG("Updating wrong version header file from version %u to %u", meta.version, DTSH_VERSION); return false; diff --git a/src/output/output.cpp b/src/output/output.cpp index 96680905..c66b3d71 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -2287,24 +2287,20 @@ namespace Mist{ sentHeader = true; } + /// \brief Makes the generic writer available to output classes + /// \param file target URL or filepath + /// \param append whether to open this connection in truncate or append mode + /// \param conn connection which will be used to send data. Will use Output's internal myConn if not initialised bool Output::connectToFile(std::string file, bool append, Socket::Connection *conn){ - if (!conn){conn = &myConn;} - int flags = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH; - int mode = O_RDWR | O_CREAT | (append ? O_APPEND : O_TRUNC); - if (!Util::createPathFor(file)){ - ERROR_MSG("Cannot not create file %s: could not create parent folder", file.c_str()); - return false; - } - int outFile = open(file.c_str(), mode, flags); - if (outFile < 0){ - ERROR_MSG("Failed to open file %s, error: %s", file.c_str(), strerror(errno)); - return false; - } - if (*conn){ + int outFile = -1; + if (!conn) {conn = &myConn;} + bool isFileTarget = HTTP::URL(file).isLocalPath(); + if (!Util::externalWriter(file, outFile, append)){return false;} + if (*conn && isFileTarget) { flock(conn->getSocket(), LOCK_UN | LOCK_NB); } // Lock the file in exclusive mode to ensure no other processes write to it - if(flock(outFile, LOCK_EX | LOCK_NB)){ + if(isFileTarget && flock(outFile, LOCK_EX | LOCK_NB)){ ERROR_MSG("Failed to lock file %s, error: %s", file.c_str(), strerror(errno)); return false; } @@ -2319,7 +2315,7 @@ namespace Mist{ int r = dup2(outFile, conn->getSocket()); if (r == -1){ - ERROR_MSG("Failed to create an alias for the socket using dup2: %s.", strerror(errno)); + ERROR_MSG("Failed to create an alias for the socket %d -> %d using dup2: %s.", outFile, conn->getSocket(), strerror(errno)); return false; } close(outFile); diff --git a/test/converter.cpp b/test/converter.cpp new file mode 100644 index 00000000..9d7af39e --- /dev/null +++ b/test/converter.cpp @@ -0,0 +1,30 @@ +#include +#include +#include +#include +#include +#include +#include + +int main(int argc, char **argv){ + int fd = -1; + Util::printDebugLevel = 10; + pid_t pid = Util::startConverted(argv + 1, fd); + INFO_MSG("PID=%u", pid); + Socket::Connection in(-1, 0); + Socket::Connection logOut(fd, -1); + while (Util::Procs::childRunning(pid)){ + if (in.spool()){ + while (in.Received().size()){ + logOut.SendNow(in.Received().get()); + in.Received().get().clear(); + } + } + if (!in){ + logOut.close(); + } + Util::sleep(1000); + } + INFO_MSG("Shutting down"); + return 0; +} diff --git a/test/meson.build b/test/meson.build index 18b465b4..d127db1d 100644 --- a/test/meson.build +++ b/test/meson.build @@ -2,6 +2,7 @@ # Testing binaries that are not unit tests, but intended for manual use logtest = executable('logtest', 'log.cpp', dependencies: libmist_dep) +convertertest = executable('convertertest', 'converter.cpp', dependencies: libmist_dep) downloadertest = executable('downloadertest', 'downloader.cpp', dependencies: libmist_dep) urireadertest = executable('urireadertest', 'urireader.cpp', dependencies: libmist_dep) jsontest = executable('jsontest', 'json.cpp', dependencies: libmist_dep)