Added support for external writers

This commit is contained in:
Marco van Dijk 2022-09-14 15:01:51 +02:00 committed by Thulinma
parent 6921586622
commit 2b18a414b4
15 changed files with 510 additions and 154 deletions

View file

@ -7,6 +7,7 @@
#include "procs.h"
#include "timing.h"
#include "util.h"
#include "url.h"
#include <errno.h> // errno, ENOENT, EEXIST
#include <iomanip>
#include <iostream>
@ -186,7 +187,104 @@ namespace Util{
}
val = val.substr(startPos, length);
}
/// \brief splits a string on commas and returns a list of substrings
void splitString(std::string &src, char delim, std::deque<std::string> &result){
result.clear();
std::deque<uint64_t> positions;
uint64_t pos = src.find(delim, 0);
while (pos != std::string::npos){
positions.push_back(pos);
pos = src.find(delim, pos + 1);
}
if (positions.size() == 0){
result.push_back(src);
return;
}
uint64_t prevPos = 0;
for (int i = 0; i < positions.size(); i++) {
if (!prevPos){result.push_back(src.substr(prevPos, positions[i]));}
else{result.push_back(src.substr(prevPos + 1, positions[i] - prevPos - 1));}
prevPos = positions[i];
}
if (prevPos < src.size()){result.push_back(src.substr(prevPos + 1));}
}
/// \brief Connects the given file descriptor to a file or uploader binary
/// \param uri target URL or filepath
/// \param outFile file descriptor which will be used to send data
/// \param append whether to open this connection in truncate or append mode
bool externalWriter(const std::string & uri, int &outFile, bool append){
HTTP::URL target(uri);
// If it is a remote target, we might need to spawn an external binary
if (!target.isLocalPath()){
bool matchedProtocol = false;
// Read configured external writers
IPC::sharedPage extwriPage(EXTWRITERS, 0, false, false);
if (extwriPage.mapped){
Util::RelAccX extWri(extwriPage.mapped, false);
if (extWri.isReady()){
for (uint64_t i = 0; i < extWri.getEndPos(); i++){
// Retrieve binary config
std::string name = extWri.getPointer("name", i);
std::string cmdline = extWri.getPointer("cmdline", i);
Util::RelAccX protocols = Util::RelAccX(extWri.getPointer("protocols", i));
uint8_t protocolCount = protocols.getPresent();
JSON::Value protocolArray;
for (uint8_t idx = 0; idx < protocolCount; idx++){
protocolArray.append(protocols.getPointer("protocol", idx));
}
jsonForEach(protocolArray, protocol){
if (target.protocol != (*protocol).asStringRef()){ continue; }
HIGH_MSG("Using %s in order connect to URL with protocol %s", name.c_str(), target.protocol.c_str());
matchedProtocol = true;
// Split configured parameters for this writer on whitespace
// TODO: we might want to trim whitespaces and remove empty parameters
std::deque<std::string> parameterList;
Util::splitString(cmdline, ' ', parameterList);
// Build the startup command, which needs space for the program name, each parameter, the target url and a null at the end
char **cmd = (char**)malloc(sizeof(char*) * (parameterList.size() + 2));
size_t curArg = 0;
// Write each parameter
for (size_t j = 0; j < parameterList.size(); j++) {
cmd[curArg++] = (char*)parameterList[j].c_str();
}
// Write the target URL as the last positional argument then close with a null at the end
cmd[curArg++] = (char*)uri.c_str();
cmd[curArg++] = 0;
pid_t child = startConverted(cmd, outFile);
if (child == -1){
ERROR_MSG("'%s' process did not start, aborting", cmd[0]);
return false;
}
Util::Procs::forget(child);
free(cmd);
break;
}
if (matchedProtocol){ break; }
}
}
}
if (!matchedProtocol){
ERROR_MSG("Could not connect to '%s', since we do not have a configured external writer to handle '%s' protocols", uri.c_str(), target.protocol.c_str());
return false;
}
}else{
int flags = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
int mode = O_RDWR | O_CREAT | (append ? O_APPEND : O_TRUNC);
if (!Util::createPathFor(uri)){
ERROR_MSG("Cannot not create file %s: could not create parent folder", uri.c_str());
return false;
}
outFile = open(uri.c_str(), mode, flags);
if (outFile < 0){
ERROR_MSG("Failed to open file %s, error: %s", uri.c_str(), strerror(errno));
return false;
}
}
return true;
}
//Returns the time to wait in milliseconds for exponential back-off waiting.
//If currIter > maxIter, always returns 5ms to prevent tight eternal loops when mistakes are made
//Otherwise, exponentially increases wait time for a total of maxWait milliseconds after maxIter calls.
@ -337,6 +435,118 @@ namespace Util{
}
}
/// \brief Forks to a log converter, which spawns an external writer and pretty prints it stdout and stderr
pid_t startConverted(const char *const *argv, int &outFile){
int p[2];
if (pipe(p) == -1){
ERROR_MSG("Unable to create pipe in order to connect to the STDIN of the target binary");
return -1;
}
Util::Procs::fork_prepare();
pid_t converterPid = fork();
// Child process
if (converterPid == 0){
Util::Procs::fork_complete();
close(p[1]);
// Override signals
struct sigaction new_action;
new_action.sa_handler = SIG_IGN;
sigemptyset(&new_action.sa_mask);
new_action.sa_flags = 0;
sigaction(SIGINT, &new_action, NULL);
sigaction(SIGHUP, &new_action, NULL);
sigaction(SIGTERM, &new_action, NULL);
sigaction(SIGPIPE, &new_action, NULL);
// Start external writer
int fdOut = -1;
int fdErr = -1;
pid_t binPid = Util::Procs::StartPiped(argv, &p[0], &fdOut, &fdErr);
close(p[0]);
if (binPid == -1){
FAIL_MSG("Failed to start binary `%s`", argv[0]);
}
// Close all sockets in the socketList
for (std::set<int>::iterator it = Util::Procs::socketList.begin();
it != Util::Procs::socketList.end(); ++it){
close(*it);
}
// Okay, so.... hear me out here.
// This code normalizes the stdin and stdout file descriptors, so that
// they are connected to the pipes we're reading from the child process.
// This is not technically needed, but it makes it easier to debug pipe-
// related problems, and works around a nasty issue were left-over stdout
// connected to another converted process may keep that other process alive
// when it really shouldn't.
// This isn't pretty, it's not fully correct, but it's good enough for now.
// If somebody writes a prettier version of this, please do sent a pull request.
// Thanks <3
std::set<int> toClose;
while (fdErr == 0 || fdErr == 1){
int tmp = dup(fdErr);
if (tmp > -1){
toClose.insert(fdErr);
fdErr = tmp;
}
}
while (fdOut == 0 || fdOut == 1){
int tmp = dup(fdOut);
if (tmp > -1){
toClose.insert(fdOut);
fdOut = tmp;
}
}
while (toClose.size()){
close(*toClose.begin());
toClose.erase(toClose.begin());
}
dup2(fdErr, 1);
dup2(fdOut, 0);
close(fdErr);
close(fdOut);
// Read pipes and write to the stdErr of parent process
Util::logConverter(1, 0, 2, argv[0], binPid);
exit(0);
}
if (converterPid == -1){
FAIL_MSG("Failed to fork log converter for log handling!");
close(p[1]);
}else{
Util::Procs::remember(converterPid);
outFile = p[1];
}
close(p[0]);
Util::Procs::fork_complete();
return converterPid;
}
void logConverter(int inErr, int inOut, int out, const char *progName, pid_t pid){
Socket::Connection errStream(-1, inErr);
Socket::Connection outStream(-1, inOut);
errStream.setBlocking(false);
outStream.setBlocking(false);
while (errStream || outStream){
if (errStream.spool() || errStream.Received().size()){
while (errStream.Received().size()){
std::string &line = errStream.Received().get();
while (line.find('\r') != std::string::npos){line.erase(line.find('\r'));}
while (line.find('\n') != std::string::npos){line.erase(line.find('\n'));}
dprintf(out, "INFO|%s|%d||%s|%s\n", progName, pid, Util::streamName, line.c_str());
line.clear();
}
}else if (outStream.spool() || outStream.Received().size()){
while (outStream.Received().size()){
std::string &line = outStream.Received().get();
while (line.find('\r') != std::string::npos){line.erase(line.find('\r'));}
while (line.find('\n') != std::string::npos){line.erase(line.find('\n'));}
dprintf(out, "INFO|%s|%d||%s|%s\n", progName, pid, Util::streamName, line.c_str());
line.clear();
}
}else{Util::sleep(25);}
}
errStream.close();
outStream.close();
}
/// Parses log messages from the given file descriptor in, printing them to out, optionally
/// calling the given callback for each valid message. Closes the file descriptor on read error
void logParser(int in, int out, bool colored,