Changed HTTPTS-based playlist writer into a generic format-agnostic playlist writer

Change-Id: I503110bca3a557342ce9a21c64824a916725a79b
This commit is contained in:
Marco 2022-08-25 11:55:18 +02:00 committed by Thulinma
parent e55038bc46
commit 03771ccac2
4 changed files with 409 additions and 173 deletions

View file

@ -6,6 +6,8 @@
#include <sys/types.h> #include <sys/types.h>
#include <sys/wait.h> #include <sys/wait.h>
#include <unistd.h> #include <unistd.h>
#include <iomanip>
#include <fstream>
#include "output.h" #include "output.h"
#include <mist/bitfields.h> #include <mist/bitfields.h>
@ -15,6 +17,8 @@
#include <mist/stream.h> #include <mist/stream.h>
#include <mist/timing.h> #include <mist/timing.h>
#include <mist/util.h> #include <mist/util.h>
#include <mist/urireader.h>
#include <sys/file.h>
/*LTS-START*/ /*LTS-START*/
#include <arpa/inet.h> #include <arpa/inet.h>
@ -108,8 +112,6 @@ namespace Mist{
firstData = true; firstData = true;
newUA = true; newUA = true;
lastPushUpdate = 0; lastPushUpdate = 0;
previousFile = "";
currentFile = "";
lastRecv = Util::bootSecs(); lastRecv = Util::bootSecs();
if (myConn){ if (myConn){
@ -520,6 +522,116 @@ namespace Mist{
return highest; return highest;
} }
/// \brief Removes entries in the playlist based on age or a maximum number of segments allowed
/// \param playlistBuffer: the contents of the playlist file. Will be edited to contain fewer entries if applicable
/// \param targetAge: maximum age of a segment in seconds. If 0, will not remove segments based on age
/// \param maxEntries: maximum amount of segments that are allowed to appear in the playlist
/// If 0, will not remove segments based on the segment count
/// \param segmentCount: current counter of segments that have been segmented as part of this stream
/// \param segmentsRemoved: counter of segments that have been removed previously from the playlist
/// \param curTime: the current local timestamp in milliseconds
/// \param targetDuration: value to fill in for the EXT-X-TARGETDURATION entry in the playlist
/// \param playlistLocation: the location of the playlist, used to find the path to segments when removing them
void Output::reinitPlaylist(std::string &playlistBuffer, uint64_t &targetAge, uint64_t &maxEntries,
uint64_t &segmentCount, uint64_t &segmentsRemoved, uint64_t &curTime,
std::string targetDuration, HTTP::URL &playlistLocation){
std::string newBuffer;
std::istringstream stream(playlistBuffer);
std::string line;
std::string curDateString;
std::string curDurationString;
// Quits early if we have no more segments we need to remove
bool done = false;
bool hasSegment = false;
while (std::getline(stream, line)){
if (!line.size()){continue;}
// Copy the rest of the file as is
if (done){
newBuffer += line + "\n";
continue;
}
// Ignore init fields
if (strncmp(line.c_str(), "#EXTM3U", 7) == 0){continue;}
if (strncmp(line.c_str(), "#EXT-X-VERSION", 14) == 0){continue;}
if (strncmp(line.c_str(), "#EXT-X-PLAYLIST-TYPE", 20) == 0){continue;}
if (strncmp(line.c_str(), "#EXT-X-TARGETDURATION", 21) == 0){continue;}
if (strncmp(line.c_str(), "#EXT-X-MEDIA-SEQUENCE", 21) == 0){continue;}
if (!hasSegment && strncmp(line.c_str(), "#EXT-X-DISCONTINUITY", 20) == 0){continue;}
// Save current segment info
if (strncmp(line.c_str(), "#EXTINF", 7) == 0){
curDurationString = line;
continue;
}
if (strncmp(line.c_str(), "#EXT-X-PROGRAM-DATE-TIME", 21) == 0){
curDateString = line;
continue;
}
// Pass along any other lines starting with a # character as is
if (line[0] == '#'){
newBuffer += line + "\n";
continue;
}
// The current line should be a segment path at this point
// If we are above the max segment count or age, ignore this segment and reset info fields
if (maxEntries && (segmentCount - segmentsRemoved >= maxEntries)){
HIGH_MSG("Dropping segment #%lu from the playlist due to the playlist reaching it's max size of %lu segments", segmentsRemoved, maxEntries);
curDateString = "";
curDurationString = "";
segmentsRemoved++;
std::string segPath = playlistLocation.link(line).getFilePath();
if(unlink(segPath.c_str())){
FAIL_MSG("Failed to remove segment at '%s'. Error: '%s'", segPath.c_str(), strerror(errno));
}else{
INFO_MSG("Removed segment at '%s'", segPath.c_str());
}
continue;
}
if (targetAge && curDateString.size() > 25){
uint64_t segmentDiff = Util::getUTCTimeDiff(curDateString.substr(25), curTime);
if (segmentDiff > targetAge){
HIGH_MSG("Dropping segment #%lu from the playlist due to old age (%lu s)", segmentsRemoved, segmentDiff);
// If the segment is too old, ignore and reset fields
curDurationString = "";
curDateString = "";
segmentsRemoved++;
std::string segPath = playlistLocation.link(line).getFilePath();
if(unlink(segPath.c_str())){
FAIL_MSG("Failed to remove segment at '%s'. Error: '%s'", segPath.c_str(), strerror(errno));
}else{
INFO_MSG("Removed segment at '%s'", segPath.c_str());
}
continue;
}
}
hasSegment = true;
// Write segment info to the new buffer
if (curDateString.size()){
newBuffer += curDateString + "\n";
curDateString = "";
}
if (curDurationString.size()){
newBuffer += curDurationString + "\n";
curDurationString = "";
}
newBuffer += line + "\n";
// If we reach this point, the conditions of max age and entries have been met
done = true;
}
// Write out new init data to the playlist buffer
playlistBuffer = "#EXTM3U\n#EXT-X-VERSION:3\n";
// Set the playlist as immutable when segmenting a non-live input
if (!M.getLive()){
playlistBuffer += "#EXT-X-PLAYLIST-TYPE:VOD\n";
// Set the playlist as append only when segmenting a live input without removing older entries
} else if (!maxEntries && !targetAge){
playlistBuffer += "#EXT-X-PLAYLIST-TYPE:EVENT\n";
}
// Else don't add a playlist type at all, as the playlist will not be append only or immutable
playlistBuffer += "#EXT-X-TARGETDURATION:" + targetDuration + "\n#EXT-X-MEDIA-SEQUENCE:" + JSON::Value(segmentsRemoved).asString() + "\n";
// Finally append the rest of the playlist
playlistBuffer += newBuffer;
}
/// Loads the page for the given trackId and keyNum into memory. /// Loads the page for the given trackId and keyNum into memory.
/// Overwrites any existing page for the same trackId. /// Overwrites any existing page for the same trackId.
/// Automatically calls thisPacket.null() if necessary. /// Automatically calls thisPacket.null() if necessary.
@ -1198,7 +1310,9 @@ namespace Mist{
// Make sure that inlineRestartCapable outputs with splitting enabled only stop right before // Make sure that inlineRestartCapable outputs with splitting enabled only stop right before
// keyframes This works because this function is executed right BEFORE sendNext(), causing // keyframes This works because this function is executed right BEFORE sendNext(), causing
// thisPacket to be the next packet in the newly splitted file. // thisPacket to be the next packet in the newly splitted file.
if (!thisPacket.getFlag("keyframe")){return false;} if (thisIdx != getMainSelectedTrack() || (!thisPacket.getFlag("keyframe") && M.getType(thisIdx) == "video")){
return false;
}
// is this a split point? // is this a split point?
if (targetParams.count("nxt-split") && atoll(targetParams["nxt-split"].c_str()) <= lastPacketTime){ if (targetParams.count("nxt-split") && atoll(targetParams["nxt-split"].c_str()) <= lastPacketTime){
INFO_MSG("Split point reached"); INFO_MSG("Split point reached");
@ -1225,10 +1339,138 @@ namespace Mist{
/// request URL (if any) /// request URL (if any)
/// ~~~~~~~~~~~~~~~ /// ~~~~~~~~~~~~~~~
int Output::run(){ int Output::run(){
// Variables used for segmenting the output
uint64_t segmentCount = 0;
uint64_t segmentsRemoved = 0;
HTTP::URL playlistLocation;
std::string playlistLocationString;
std::string playlistBuffer;
std::string currentTarget;
uint64_t currentStartTime = 0;
uint64_t maxEntries = 0;
uint64_t targetAge = 0;
std::string targetDuration;
bool reInitPlaylist = false;
Socket::Connection plsConn;
uint64_t systemBoot;
std::string origTarget;
const char* origTargetPtr = getenv("MST_ORIG_TARGET");
if (origTargetPtr){
origTarget = origTargetPtr;
if (origTarget.rfind('?') != std::string::npos){
origTarget.erase(origTarget.rfind('?'));
}
}else if (config->hasOption("target")){
origTarget = config->getString("target");
}
Util::streamVariables(origTarget, streamName);
if (targetParams.count("maxEntries")){
maxEntries = atoll(targetParams["maxEntries"].c_str());
}
if (targetParams.count("targetAge")){
targetAge = atoll(targetParams["targetAge"].c_str());
}
// When segmenting to a playlist, handle any existing files and init some data
if (targetParams.count("m3u8")){
// Load system boot time from the global config
systemBoot = Util::getGlobalConfig("systemBoot").asInt();
// fall back to local calculation if loading from global config fails
if (!systemBoot){systemBoot = (Util::unixMS() - Util::bootMS());}
// Create a new or connect to an existing playlist file
if (!plsConn){
playlistLocation = HTTP::URL(origTarget).link(targetParams["m3u8"]);
if (playlistLocation.isLocalPath()){
playlistLocationString = playlistLocation.getFilePath();
INFO_MSG("Segmenting to local playlist '%s'", playlistLocationString.c_str());
// Check if we already have a playlist at the target location
std::ifstream inFile(playlistLocationString.c_str());
if (inFile.good()){
std::string line;
// If appending, remove endlist and count segments
if (targetParams.count("append")){
while (std::getline(inFile, line)) {
if (strncmp("#EXTINF", line.c_str(), 7) == 0){
segmentCount++;
}else if (strcmp("#EXT-X-ENDLIST", line.c_str()) == 0){
INFO_MSG("Stripping line `#EXT-X-ENDLIST`");
continue;
}
playlistBuffer += line + '\n';
}
playlistBuffer += "#EXT-X-DISCONTINUITY\n";
INFO_MSG("Appending to existing local playlist file '%s'", playlistLocationString.c_str());
INFO_MSG("Found %lu prior segments", segmentCount);
}else{
// Remove all segments referenced in the playlist
while (std::getline(inFile, line)) {
if (line[0] == '#'){
continue;
}else{
std::string segPath = playlistLocation.link(line).getFilePath();
if(unlink(segPath.c_str())){
FAIL_MSG("Failed to remove segment at '%s'. Error: '%s'", segPath.c_str(), strerror(errno));
}else{
INFO_MSG("Removed segment at '%s'", segPath.c_str());
}
}
}
INFO_MSG("Overwriting existing local playlist file '%s'", playlistLocationString.c_str());
reInitPlaylist = true;
}
}else{
INFO_MSG("Creating new local playlist file '%s'", playlistLocationString.c_str());
reInitPlaylist = true;
}
config->getOption("target", true).append(playlistLocationString);
}else{
playlistLocationString = playlistLocation.getUrl();
// Disable sliding window playlists, as the current external writer
// implementation requires us to keep a single connection to the playlist open
maxEntries = 0;
targetAge = 0;
// Check if there is an existing playlist at the target location
HTTP::URIReader outFile(playlistLocationString);
if (outFile){
// If so, init the buffer with remote data
if (targetParams.count("append")){
char *dataPtr;
size_t dataLen;
outFile.readAll(dataPtr, dataLen);
std::string existingBuffer(dataPtr, dataLen);
std::istringstream inFile(existingBuffer);
std::string line;
while (std::getline(inFile, line)) {
if (strncmp("#EXTINF", line.c_str(), 7) == 0){
segmentCount++;
}else if (strcmp("#EXT-X-ENDLIST", line.c_str()) == 0){
INFO_MSG("Stripping line `#EXT-X-ENDLIST`");
continue;
}
playlistBuffer += line + '\n';
}
playlistBuffer += "#EXT-X-DISCONTINUITY\n";
INFO_MSG("Found %lu prior segments", segmentCount);
INFO_MSG("Appending to existing remote playlist file '%s'", playlistLocationString.c_str());
}else{
WARN_MSG("Overwriting existing remote playlist file '%s'", playlistLocationString.c_str());
}
}else{
INFO_MSG("Creating new remote playlist file '%s'", playlistLocationString.c_str());
}
}
}
// By default split into a new segment after 60 seconds
if (!targetParams.count("split")){
targetParams["split"] = "60";
}
targetDuration = targetParams["split"];
}
Comms::sessionConfigCache(); Comms::sessionConfigCache();
/*LTS-START*/ /*LTS-START*/
// Connect to file target, if needed // Connect to file target, if needed
if (isFileTarget()){ if (isFileTarget()){
isRecordingToFile = true;
if (!streamName.size()){ if (!streamName.size()){
WARN_MSG("Recording unconnected %s output to file! Cancelled.", capa["name"].asString().c_str()); WARN_MSG("Recording unconnected %s output to file! Cancelled.", capa["name"].asString().c_str());
onFail("Unconnected recording output", true); onFail("Unconnected recording output", true);
@ -1240,16 +1482,44 @@ namespace Mist{
onFail("Stream not available for recording", true); onFail("Stream not available for recording", true);
return 3; return 3;
} }
if (config->getString("target") == "-"){ initialSeek();
// Initialises the playlist if we are segmenting the output with a playlist
if (targetParams.count("m3u8")){
if (reInitPlaylist){
uint64_t unixMs = M.getBootMsOffset() + systemBoot + currentStartTime;
reinitPlaylist(playlistBuffer, targetAge, maxEntries, segmentCount, segmentsRemoved, unixMs, targetDuration, playlistLocation);
}
// Do not open the playlist just yet if this is a non-live source
if (M.getLive()){
connectToFile(playlistLocationString, false, &plsConn);
// Write initial contents to the playlist file
if (!plsConn){
FAIL_MSG("Failed to open a connection to playlist file `%s` for segmenting", playlistLocationString.c_str());
Util::logExitReason("Failed to open a connection to playlist file `%s` for segmenting", playlistLocationString.c_str());
return 1;
}else if (playlistBuffer.size()){
// Do not write to the playlist intermediately if we are outputting a VOD playlist
plsConn.SendNow(playlistBuffer);
// Clear the buffer if we will only be appending lines instead of overwriting the entire playlist file
if (!maxEntries && !targetAge) {playlistBuffer = "";}
}
}
}
currentStartTime = currentTime();
std::string newTarget = origTarget;
Util::replace(newTarget, "$currentMediaTime", JSON::Value(currentStartTime).asString());
Util::replace(newTarget, "$segmentCounter", JSON::Value(segmentCount).asString());
currentTarget = newTarget;
if (newTarget == "-"){
INFO_MSG("Outputting %s to stdout with %s format", streamName.c_str(), INFO_MSG("Outputting %s to stdout with %s format", streamName.c_str(),
capa["name"].asString().c_str()); capa["name"].asString().c_str());
}else{ }else{
if (!connectToFile(config->getString("target"), targetParams.count("append"))){ if (!connectToFile(newTarget, targetParams.count("append"))){
onFail("Could not connect to the target for recording", true); onFail("Could not connect to the target for recording", true);
return 3; return 3;
} }
INFO_MSG("Recording %s to %s with %s format", streamName.c_str(), INFO_MSG("Recording %s to %s with %s format", streamName.c_str(),
config->getString("target").c_str(), capa["name"].asString().c_str()); newTarget.c_str(), capa["name"].asString().c_str());
} }
parseData = true; parseData = true;
wantRequest = false; wantRequest = false;
@ -1284,7 +1554,9 @@ namespace Mist{
if (prepareNext()){ if (prepareNext()){
if (thisPacket){ if (thisPacket){
lastPacketTime = thisTime; lastPacketTime = thisTime;
if (firstPacketTime == 0xFFFFFFFFFFFFFFFFull){firstPacketTime = lastPacketTime;} if (firstPacketTime == 0xFFFFFFFFFFFFFFFFull){
firstPacketTime = lastPacketTime;
}
// slow down processing, if real time speed is wanted // slow down processing, if real time speed is wanted
if (realTime && buffer.getSyncMode()){ if (realTime && buffer.getSyncMode()){
@ -1343,17 +1615,90 @@ namespace Mist{
} }
if (reachedPlannedStop()){ if (reachedPlannedStop()){
const char *origTarget = getenv("MST_ORIG_TARGET");
targetParams.erase("nxt-split"); targetParams.erase("nxt-split");
if (inlineRestartCapable() && origTarget && !reachedPlannedStop()){ if (inlineRestartCapable() && !reachedPlannedStop()){
// Write the segment to the playlist if applicable
if (targetParams.count("m3u8")){
// We require an active connection to the playlist
// except for VOD, where we connect and write at the end of segmenting
if (!plsConn && M.getLive()){
FAIL_MSG("Lost connection to playlist file `%s` during segmenting", playlistLocationString.c_str());
Util::logExitReason("Lost connection to playlist file `%s` during segmenting", playlistLocationString.c_str());
break;
}
std::string segment = HTTP::URL(currentTarget).getLinkFrom(playlistLocation);
if (M.getLive()){
uint64_t unixMs = M.getBootMsOffset() + systemBoot + currentStartTime;
playlistBuffer += "#EXT-X-PROGRAM-DATE-TIME:" + Util::getUTCStringMillis(unixMs) + "\n";
}
INFO_MSG("Adding new segment `%s` of %lums to playlist '%s'", segment.c_str(), lastPacketTime - currentStartTime, playlistLocationString.c_str());
// Append duration & TS filename to playlist file
std::stringstream tmp;
double segmentDuration = (lastPacketTime - currentStartTime) / 1000.0;
tmp << "#EXTINF:" << std::fixed << std::setprecision(3) << segmentDuration << ",\n"+ segment + "\n";
playlistBuffer += tmp.str();
// Check if the targetDuration is still valid
if (segmentDuration > atoll(targetDuration.c_str())){
// Set the new targetDuration to the ceil of the segment duration
targetDuration = JSON::Value(uint64_t(segmentDuration) + 1).asString();
WARN_MSG("Segment #%lu has a longer duration than the target duration. Adjusting the targetDuration to %s seconds", segmentCount, targetDuration.c_str());
// Round the target split time down to ensure we split on the keyframe for very long keyframe intervals
targetParams["split"] = JSON::Value(uint64_t(segmentDuration)).asString();
// Modify the buffer to contain the new targetDuration
if (!M.getLive()){
uint64_t unixMs = M.getBootMsOffset() + systemBoot + currentStartTime;
reinitPlaylist(playlistBuffer, targetAge, maxEntries, segmentCount, segmentsRemoved, unixMs, targetDuration, playlistLocation);
}else if (!maxEntries && !targetAge){
// If we are appending to an existing playlist, we need to recover the playlistBuffer and reopen the playlist
HTTP::URIReader inFile(playlistLocationString);
char *newBuffer;
uint64_t bytesRead;
inFile.readAll(newBuffer, bytesRead);
playlistBuffer = std::string(newBuffer, bytesRead) + playlistBuffer;
// Reinit the playlist with the new targetDuration
uint64_t unixMs = M.getBootMsOffset() + systemBoot + currentStartTime;
reinitPlaylist(playlistBuffer, targetAge, maxEntries, segmentCount, segmentsRemoved, unixMs, targetDuration, playlistLocation);
connectToFile(playlistLocationString, false, &plsConn);
}
// Else we are in a sliding window playlist, so it will already get overwritten
}
// Remove older entries in the playlist
if (maxEntries || targetAge){
uint64_t unixMs = M.getBootMsOffset() + systemBoot + currentStartTime;
reinitPlaylist(playlistBuffer, targetAge, maxEntries, segmentCount, segmentsRemoved, unixMs, targetDuration, playlistLocation);
}
// Do not write to the playlist intermediately if we are outputting a VOD playlist
if (M.getLive()){
// Clear the buffer if we will only be appending lines instead of overwriting the entire playlist file
if (!maxEntries && !targetAge) {
plsConn.SendNow(playlistBuffer);
playlistBuffer = "";
// Else re-open the file to force an overwrite
}else if(connectToFile(playlistLocationString, false, &plsConn)){
plsConn.SendNow(playlistBuffer);
}
}
}
// Keep track of filenames written, so that they can be added to the playlist file
std::string newTarget = origTarget; std::string newTarget = origTarget;
Util::streamVariables(newTarget, streamName); currentStartTime = lastPacketTime;
segmentCount++;
// Replace variable currentMediaTime and segmentCounter
if (targetParams.count("m3u8")){
if (newTarget.find("$currentMediaTime") == std::string::npos && newTarget.find("$segmentCounter") == std::string::npos){
FAIL_MSG("Target segmented output does not contain a currentMediaTime or segmentCounter: %s", newTarget.c_str());
Util::logExitReason("Target segmented output does not contain a currentMediaTime or segmentCounter: %s", newTarget.c_str());
onFinish();
break;
}
Util::replace(newTarget, "$currentMediaTime", JSON::Value(currentStartTime).asString());
Util::replace(newTarget, "$segmentCounter", JSON::Value(segmentCount).asString());
}
if (newTarget.rfind('?') != std::string::npos){ if (newTarget.rfind('?') != std::string::npos){
newTarget.erase(newTarget.rfind('?')); newTarget.erase(newTarget.rfind('?'));
} }
// Keep track of filenames written, so that they can be added to the playlist file currentTarget = newTarget;
previousFile = currentFile;
currentFile = newTarget;
INFO_MSG("Switching to next push target filename: %s", newTarget.c_str()); INFO_MSG("Switching to next push target filename: %s", newTarget.c_str());
if (!connectToFile(newTarget)){ if (!connectToFile(newTarget)){
FAIL_MSG("Failed to open file, aborting: %s", newTarget.c_str()); FAIL_MSG("Failed to open file, aborting: %s", newTarget.c_str());
@ -1404,6 +1749,41 @@ namespace Mist{
INFO_MSG("Client handler shutting down, exit reason: %s", Util::exitReason); INFO_MSG("Client handler shutting down, exit reason: %s", Util::exitReason);
} }
onFinish(); onFinish();
// Write last segment
if (targetParams.count("m3u8")){
// If this is VOD, we can finally open up the connection to the playlist file
if (M.getVod()){connectToFile(playlistLocationString, false, &plsConn);}
if (plsConn){
std::string segment = HTTP::URL(currentTarget).getLinkFrom(playlistLocation);
if (M.getLive()){
uint64_t unixMs = M.getBootMsOffset() + systemBoot + currentStartTime;
playlistBuffer += "#EXT-X-PROGRAM-DATE-TIME:" + Util::getUTCStringMillis(unixMs) + "\n";
}
INFO_MSG("Adding final segment `%s` of %lums to playlist '%s'", segment.c_str(), lastPacketTime - currentStartTime, playlistLocationString.c_str());
// Append duration & TS filename to playlist file
std::stringstream tmp;
tmp << "#EXTINF:" << std::fixed << std::setprecision(3) << (lastPacketTime - currentStartTime) / 1000.0 << ",\n"+ segment + "\n";
if (!M.getLive() || (!maxEntries && !targetAge)){tmp << "#EXT-X-ENDLIST\n";}
playlistBuffer += tmp.str();
// Remove older entries in the playlist
if (maxEntries || targetAge){
uint64_t unixMs = M.getBootMsOffset() + systemBoot + currentStartTime;
reinitPlaylist(playlistBuffer, targetAge, maxEntries, segmentCount, segmentsRemoved, unixMs, targetDuration, playlistLocation);
}
// Append the final contents to the playlist
if (!maxEntries && !targetAge) {
plsConn.SendNow(playlistBuffer);
// Else re-open the file to force an overwrite
}else if(connectToFile(playlistLocationString, false, &plsConn)){
plsConn.SendNow(playlistBuffer);
}
playlistBuffer = "";
}else{
FAIL_MSG("Lost connection to the playlist file `%s` during segmenting", playlistLocationString.c_str());
Util::logExitReason("Lost connection to the playlist file `%s` during segmenting", playlistLocationString.c_str());
return 1;
}
}
/*LTS-START*/ /*LTS-START*/
if (Triggers::shouldTrigger("CONN_CLOSE", streamName)){ if (Triggers::shouldTrigger("CONN_CLOSE", streamName)){
@ -1889,6 +2269,14 @@ namespace Mist{
ERROR_MSG("Failed to open file %s, error: %s", file.c_str(), strerror(errno)); ERROR_MSG("Failed to open file %s, error: %s", file.c_str(), strerror(errno));
return false; return false;
} }
if (*conn){
flock(conn->getSocket(), LOCK_UN | LOCK_NB);
}
// Lock the file in exclusive mode to ensure no other processes write to it
if(flock(outFile, LOCK_EX | LOCK_NB)){
ERROR_MSG("Failed to lock file %s, error: %s", file.c_str(), strerror(errno));
return false;
}
//Ensure the Socket::Connection is valid before we overwrite the socket //Ensure the Socket::Connection is valid before we overwrite the socket
if (!*conn){ if (!*conn){
@ -1904,7 +2292,6 @@ namespace Mist{
return false; return false;
} }
close(outFile); close(outFile);
isRecordingToFile = true;
realTime = 0; realTime = 0;
return true; return true;
} }

View file

@ -11,6 +11,7 @@
#include <mist/socket.h> #include <mist/socket.h>
#include <mist/timing.h> #include <mist/timing.h>
#include <mist/stream.h> #include <mist/stream.h>
#include <mist/url.h>
#include <set> #include <set>
namespace Mist{ namespace Mist{
@ -31,8 +32,6 @@ namespace Mist{
/*LTS-START*/ /*LTS-START*/
std::string reqUrl; std::string reqUrl;
/*LTS-END*/ /*LTS-END*/
std::string previousFile;
std::string currentFile;
// non-virtual generic functions // non-virtual generic functions
virtual int run(); virtual int run();
virtual void stats(bool force = false); virtual void stats(bool force = false);
@ -93,6 +92,9 @@ namespace Mist{
uint64_t pageNumMax(size_t trackId); uint64_t pageNumMax(size_t trackId);
bool isRecordingToFile; bool isRecordingToFile;
uint64_t lastStats; ///< Time of last sending of stats. uint64_t lastStats; ///< Time of last sending of stats.
void reinitPlaylist(std::string &playlistBuffer, uint64_t &maxAge, uint64_t &maxEntries,
uint64_t &segmentCount, uint64_t &segmentsRemoved, uint64_t &curTime,
std::string targetDuration, HTTP::URL &playlistLocation);
Util::packetSorter buffer; ///< A sorted list of next-to-be-loaded packets. Util::packetSorter buffer; ///< A sorted list of next-to-be-loaded packets.
bool sought; ///< If a seek has been done, this is set to true. Used for seeking on bool sought; ///< If a seek has been done, this is set to true. Used for seeking on

View file

@ -13,16 +13,8 @@
namespace Mist{ namespace Mist{
OutHTTPTS::OutHTTPTS(Socket::Connection &conn) : TSOutput(conn){ OutHTTPTS::OutHTTPTS(Socket::Connection &conn) : TSOutput(conn){
sendRepeatingHeaders = 500; // PAT/PMT every 500ms (DVB spec) sendRepeatingHeaders = 500; // PAT/PMT every 500ms (DVB spec)
removeOldPlaylistFiles = true; HTTP::URL target(config->getString("target"));
if (target.protocol == "srt"){
if (targetParams["overwrite"].size()){
std::string paramValue = targetParams["overwrite"];
if (paramValue == "0" || paramValue == "false"){
removeOldPlaylistFiles = false;
}
}
if (config->getString("target").substr(0, 6) == "srt://"){
std::string tgt = config->getString("target"); std::string tgt = config->getString("target");
HTTP::URL srtUrl(tgt); HTTP::URL srtUrl(tgt);
config->getOption("target", true).append("ts-exec:srt-live-transmit file://con " + srtUrl.getUrl()); config->getOption("target", true).append("ts-exec:srt-live-transmit file://con " + srtUrl.getUrl());
@ -55,59 +47,6 @@ namespace Mist{
wantRequest = false; wantRequest = false;
parseData = true; parseData = true;
} else if (config->getString("target").size()){
HTTP::URL target(config->getString("target"));
// If writing to a playlist file, set target strings and remember playlist location
if(target.getExt() == "m3u" || target.getExt() == "m3u8"){
// Location to .m3u(8) file we will keep updated
playlistLocation = target.getFilePath();
// Subfolder name which gets prepended to each entry in the playlist file
prepend = "./segments_" + target.path.substr(target.path.rfind("/") + 1, target.path.size() - target.getExt().size() - target.path.rfind("/") - 2) + "/";
HTTP::URL tsFolderPath(target.link(prepend).getFilePath());
tsFilePath = tsFolderPath.getFilePath() + "$datetime.ts";
INFO_MSG("Playlist location will be '%s'. TS filename will be in the form of '%s'", playlistLocation.c_str(), tsFilePath.c_str());
// Remember target name including the $datetime variable
setenv("MST_ORIG_TARGET", tsFilePath.c_str(), 1);
// If the playlist exists, first remove existing TS files
if (removeOldPlaylistFiles){
DIR *dir = opendir(tsFolderPath.getFilePath().c_str());
if (dir){
INFO_MSG("Removing TS files in %s", tsFolderPath.getFilePath().c_str());
struct dirent *dp;
do{
errno = 0;
if ((dp = readdir(dir))){
HTTP::URL filePath = tsFolderPath.link(dp->d_name);
if (filePath.getExt() == "ts"){
MEDIUM_MSG("Removing TS file '%s'", filePath.getFilePath().c_str());
remove(filePath.getFilePath().c_str());
}
}
}while (dp != NULL);
closedir(dir);
}
// Also remove the playlist file itself. SendHeader handles (re)creation of the playlist file
if (!remove(playlistLocation.c_str())){
HIGH_MSG("Removed existing playlist file '%s'", playlistLocation.c_str());
}
}else{
// Else we want to add the #EXT-X-DISCONTINUITY tag
std::ofstream outPlsFile;
outPlsFile.open(playlistLocation.c_str(), std::ofstream::app);
outPlsFile << "#EXT-X-DISCONTINUITY" << "\n";
outPlsFile.close();
}
// Set first target filename
Util::streamVariables(tsFilePath, streamName);
if (tsFilePath.rfind('?') != std::string::npos){
tsFilePath.erase(tsFilePath.rfind('?'));
}
config->getOption("target", true).append(tsFilePath);
// Finally set split time in seconds
std::stringstream ss;
ss << config->getInteger("targetSegmentLength");
targetParams["split"] = ss.str();
}
} }
} }
@ -142,8 +81,6 @@ namespace Mist{
capa["methods"][0u]["priority"] = 1; capa["methods"][0u]["priority"] = 1;
capa["push_urls"].append("/*.ts"); capa["push_urls"].append("/*.ts");
capa["push_urls"].append("ts-exec:*"); capa["push_urls"].append("ts-exec:*");
capa["push_urls"].append("/*.m3u");
capa["push_urls"].append("/*.m3u8");
#ifndef WITH_SRT #ifndef WITH_SRT
{ {
@ -166,21 +103,8 @@ namespace Mist{
opt["arg"] = "string"; opt["arg"] = "string";
opt["default"] = ""; opt["default"] = "";
opt["arg_num"] = 1; opt["arg_num"] = 1;
opt["help"] = "Target filename to store TS file as, '*.m3u8' or '*.m3u' for writing to a playlist, or - for stdout."; opt["help"] = "Target filename to store TS file as or - for stdout.";
cfg->addOption("target", opt); cfg->addOption("target", opt);
opt.null();
opt["arg"] = "integer";
opt["long"] = "targetSegmentLength";
opt["short"] = "l";
opt["help"] = "Target time duration in seconds for TS files, when outputting to disk.";
opt["value"].append(5);
config->addOption("targetSegmentLength", opt);
capa["optional"]["targetSegmentLength"]["name"] = "Length of TS files (ms)";
capa["optional"]["targetSegmentLength"]["help"] = "Target time duration in milliseconds for TS files, when outputting to disk.";
capa["optional"]["targetSegmentLength"]["option"] = "--targetLength";
capa["optional"]["targetSegmentLength"]["type"] = "uint";
capa["optional"]["targetSegmentLength"]["default"] = 5;
} }
bool OutHTTPTS::isRecording(){return config->getString("target").size();} bool OutHTTPTS::isRecording(){return config->getString("target").size();}
@ -207,76 +131,7 @@ namespace Mist{
wantRequest = false; wantRequest = false;
} }
/// \brief Goes through all of the packets in a TS file in order to calculate the total duration
/// \param firstTime: is set to the firstTime of the TS file
float OutHTTPTS::calculateSegmentDuration(std::string filepath, uint64_t & firstTime){
firstTime = -1;
uint64_t lastTime = 0;
FILE *inFile;
TS::Packet packet;
DTSC::Packet headerPack;
TS::Stream tsStream;
inFile = fopen(filepath.c_str(), "r");
while (!feof(inFile)){
if (!packet.FromFile(inFile)){
break;
}
tsStream.parse(packet, 0);
while (tsStream.hasPacketOnEachTrack()){
tsStream.getEarliestPacket(headerPack);
lastTime = headerPack.getTime();
if (firstTime > lastTime){
firstTime = headerPack.getTime();
}
DONTEVEN_MSG("Found DTSC packet with timestamp %" PRIu64, lastTime);
}
}
fclose(inFile);
HIGH_MSG("Duration of TS file at location '%s' is " PRETTY_PRINT_MSTIME " (" PRETTY_PRINT_MSTIME " - " PRETTY_PRINT_MSTIME ")", filepath.c_str(), PRETTY_ARG_MSTIME(lastTime - firstTime), PRETTY_ARG_MSTIME(lastTime), PRETTY_ARG_MSTIME(firstTime));
return (lastTime - firstTime);
}
void OutHTTPTS::sendHeader(){ void OutHTTPTS::sendHeader(){
bool writeTimestamp = true;
if (previousFile != ""){
std::ofstream outPlsFile;
// Calculate segment duration and round up to the nearest integer
uint64_t firstTime = 0;
float segmentDuration = (calculateSegmentDuration(previousFile, firstTime) / 1000);
if (segmentDuration > config->getInteger("targetSegmentLength")){
WARN_MSG("Segment duration exceeds target segment duration. This may cause playback stalls or other errors");
}
// If the playlist does not exist, init it
FILE *fileHandle = fopen(playlistLocation.c_str(), "r");
if (!fileHandle || removeOldPlaylistFiles){
INFO_MSG("Creating new playlist at '%s'", playlistLocation.c_str());
removeOldPlaylistFiles = false;
outPlsFile.open(playlistLocation.c_str(), std::ofstream::trunc);
outPlsFile << "#EXTM3U\n" << "#EXT-X-VERSION:3\n" << "#EXT-X-PLAYLIST-TYPE:EVENT\n"
<< "#EXT-X-TARGETDURATION:" << config->getInteger("targetSegmentLength") << "\n#EXT-X-MEDIA-SEQUENCE:0\n";
// Add current livestream timestamp
if (M.getLive()){
uint64_t unixMs = M.getBootMsOffset() + (Util::unixMS() - Util::bootMS()) + firstTime;
outPlsFile << "#EXT-X-PROGRAM-DATE-TIME:" << Util::getUTCStringMillis(unixMs) << std::endl;
writeTimestamp = false;
}
// Otherwise open it in append mode
} else {
fclose(fileHandle);
outPlsFile.open(playlistLocation.c_str(), std::ofstream::app);
}
// Add current timestamp
if (M.getLive() && writeTimestamp){
uint64_t unixMs = M.getBootMsOffset() + (Util::unixMS() - Util::bootMS()) + firstTime;
outPlsFile << "#EXT-X-PROGRAM-DATE-TIME:" << Util::getUTCStringMillis(unixMs) << std::endl;
}
INFO_MSG("Adding new segment of %.2f seconds to playlist '%s'", segmentDuration, playlistLocation.c_str());
// Append duration & TS filename to playlist file
outPlsFile << "#EXTINF:" << segmentDuration << ",\n" << prepend << previousFile.substr(previousFile.rfind("/") + 1) << "\n";
outPlsFile.close();
}
TSOutput::sendHeader(); TSOutput::sendHeader();
} }

View file

@ -15,19 +15,11 @@ namespace Mist{
bool isRecording(); bool isRecording();
bool isFileTarget(){ bool isFileTarget(){
HTTP::URL target(config->getString("target")); HTTP::URL target(config->getString("target"));
if (isRecording() && (target.getExt() == "ts" && config->getString("target").substr(0, 8) != "ts-exec:")){return true;} if (isRecording() && (config->getString("target").substr(0, 8) != "ts-exec:")){return true;}
return false; return false;
} }
virtual bool inlineRestartCapable() const{return true;} virtual bool inlineRestartCapable() const{return true;}
void sendHeader(); void sendHeader();
float calculateSegmentDuration(std::string filepath, uint64_t & firstTime);
// Location of playlist file which we need to keep updated
std::string playlistLocation;
std::string tsFilePath;
// Subfolder name (based on playlist name) which gets prepended to each entry in the playlist file
std::string prepend;
// Defaults to True. When exporting to .m3u8 & TS, it will overwrite the existing playlist file and remove existing .TS files
bool removeOldPlaylistFiles;
}; };
}// namespace Mist }// namespace Mist