Playlist support fixed

This commit is contained in:
Thulinma 2021-07-19 16:03:05 +02:00
parent 684df4b23d
commit ad4c1abd0b
20 changed files with 343 additions and 237 deletions

View file

@ -1452,12 +1452,12 @@ namespace DTSC{
/// Resizes a given track to be able to hold the given amount of fragments, keys, parts and pages.
/// Currently called exclusively from Meta::update(), to resize the internal structures.
void Meta::resizeTrack(size_t source, size_t fragCount, size_t keyCount, size_t partCount, size_t pageCount, const char * reason){
char pageName[NAME_BUFFER_SIZE];
IPC::semaphore resizeLock;
if (!isMemBuf){
snprintf(pageName, NAME_BUFFER_SIZE, "/" SHM_STREAM_TM, streamName.c_str(), getpid(), source);
resizeLock.open(pageName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
std::string pageName = "/";
pageName += trackList.getPointer(trackPageField, source);
resizeLock.open(pageName.c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
resizeLock.wait();
}
@ -1492,9 +1492,8 @@ namespace DTSC{
memset(tMemBuf[source], 0, newPageSize);
t.track = Util::RelAccX(tMemBuf[source], false);
}else{
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_TM, streamName.c_str(), getpid(), source);
tM[source].master = true;
tM[source].init(pageName, newPageSize, true);
tM[source].init(trackList.getPointer(trackPageField, source), newPageSize, true);
if (!tM[source].mapped){
FAIL_MSG("Failed to re-allocate shared memory for track %zu: %s", source, strerror(errno));
resizeLock.unlink();
@ -1711,6 +1710,26 @@ namespace DTSC{
return tNumber;
}
bool Meta::isClaimed(size_t trackIdx) const{
return (trackList.getInt(trackPidField, trackIdx) != 0);
}
void Meta::claimTrack(size_t trackIdx){
if (trackList.getInt(trackPidField, trackIdx) != 0){
FAIL_MSG("Cannot claim track: already claimed by PID %" PRIu64, trackList.getInt(trackPidField, trackIdx));
return;
}
trackList.setInt(trackPidField, getpid(), trackIdx);
}
void Meta::abandonTrack(size_t trackIdx){
if (trackList.getInt(trackPidField, trackIdx) != getpid()){
FAIL_MSG("Cannot abandon track: is claimed by PID %" PRIu64 ", not us", trackList.getInt(trackPidField, trackIdx));
return;
}
trackList.setInt(trackPidField, 0, trackIdx);
}
/// Internal function that is called whenever a track is (re)written to the memory structures.
/// Adds the needed fields and sets all the RelAccXFieldData members to point to them.
void Meta::initializeTrack(Track &t, size_t fragCount, size_t keyCount, size_t partCount, size_t pageCount){
@ -2178,12 +2197,10 @@ namespace DTSC{
/// Removes the first key from the memory structure and caches.
bool Meta::removeFirstKey(size_t trackIdx){
char pageName[NAME_BUFFER_SIZE];
IPC::semaphore resizeLock;
if (!isMemBuf){
__pid_t trPid = trackList.getInt(trackPidField, trackIdx);
snprintf(pageName, NAME_BUFFER_SIZE, "/" SHM_STREAM_TM, streamName.c_str(), trPid, trackIdx);
const char * pageName = trackList.getPointer(trackPageField, trackIdx);
resizeLock.open(pageName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (!resizeLock.tryWait()){
MEDIUM_MSG("Metadata is busy, delaying deletion of key a bit");

View file

@ -382,6 +382,10 @@ namespace DTSC{
void setMaxKeepAway(uint64_t maxKeepAway);
uint64_t getMaxKeepAway() const;
void claimTrack(size_t trackIdx);
bool isClaimed(size_t trackIdx) const;
void abandonTrack(size_t trackIdx);
/*LTS-START*/
void setSourceTrack(size_t trackIdx, size_t sourceTrack);
uint64_t getSourceTrack(size_t trackIdx) const;

View file

@ -170,21 +170,7 @@ namespace Mist{
capa["optional"]["realtime"]["name"] = "Simulated Live";
capa["optional"]["realtime"]["help"] = "Make this input run as a simulated live stream";
capa["optional"]["realtime"]["option"] = "--realtime";
option.null();
option["long"] = "simulated-starttime";
option["arg"] = "integer";
option["short"] = "S";
option["help"] = "Unix timestamp on which the simulated start of the stream is based.";
option["value"].append(0);
config->addOption("simulated-starttime", option);
capa["optional"]["simulated-starttime"]["name"] = "Simulated start time";
capa["optional"]["simulated-starttime"]["help"] =
"The unix timestamp on which this stream is assumed to have started playback, or 0 for "
"automatic";
capa["optional"]["simulated-starttime"]["option"] = "--simulated-starttime";
capa["optional"]["simulated-starttime"]["type"] = "uint";
capa["optional"]["simulated-starttime"]["default"] = 0;
/*LTS-END*/
capa["optional"]["debug"]["name"] = "debug";
@ -761,6 +747,15 @@ namespace Mist{
}
INFO_MSG("Input started");
//Simulated real time inputs bypass most normal logic
if (config->getBool("realtime")){
realtimeMainLoop();
finish();
INFO_MSG("Real-time input closing clean; reason: %s", Util::exitReason);
return;
}
meta.reInit(streamName, false);
if (!openStreamSource()){
@ -769,10 +764,8 @@ namespace Mist{
}
parseStreamHeader();
std::set<size_t> validTracks;
if (publishesTracks()){
validTracks = M.getMySourceTracks(getpid());
std::set<size_t> validTracks = M.getMySourceTracks(getpid());
if (!validTracks.size()){
userSelect.clear();
finish();
@ -781,72 +774,9 @@ namespace Mist{
}
}
timeOffset = 0;
uint64_t minFirstMs = 0;
// If resume mode is on, find matching tracks and set timeOffset values to make sure we append to the tracks.
if (publishesTracks() && config->getBool("realtime")){
seek(0);
minFirstMs = 0xFFFFFFFFFFFFFFFFull;
uint64_t maxFirstMs = 0;
uint64_t minLastMs = 0xFFFFFFFFFFFFFFFFull;
uint64_t maxLastMs = 0;
// track lowest firstms value
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); ++it){
if (meta.getFirstms(*it) < minFirstMs){minFirstMs = meta.getFirstms(*it);}
if (meta.getFirstms(*it) > maxFirstMs){maxFirstMs = meta.getFirstms(*it);}
if (meta.getLastms(*it) < minLastMs){minLastMs = meta.getLastms(*it);}
if (meta.getLastms(*it) > maxLastMs){maxLastMs = meta.getLastms(*it);}
}
if (maxFirstMs - minFirstMs > 500){
WARN_MSG("Begin timings of tracks for this file are %" PRIu64
" ms apart. This may mess up playback to some degree. (Range: %" PRIu64
"ms - %" PRIu64 "ms)",
maxFirstMs - minFirstMs, minFirstMs, maxFirstMs);
}
if (maxLastMs - minLastMs > 500){
WARN_MSG("Stop timings of tracks for this file are %" PRIu64
" ms apart. This may mess up playback to some degree. (Range: %" PRIu64
"ms - %" PRIu64 "ms)",
maxLastMs - minLastMs, minLastMs, maxLastMs);
}
// find highest current time
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); ++it){
timeOffset = std::max(timeOffset, (int64_t)meta.getLastms(*it));
}
if (timeOffset){
if (minFirstMs == 0xFFFFFFFFFFFFFFFFull){minFirstMs = 0;}
MEDIUM_MSG("Offset is %" PRId64
"ms, adding 40ms and subtracting the start time of %" PRIu64,
timeOffset, minFirstMs);
timeOffset += 40; // Add an artificial frame at 25 FPS to make sure we append, not overwrite
timeOffset -= minFirstMs; // we don't need to add the lowest firstms value to the offset, as it's already there
}
}
if (publishesTracks()){
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); ++it){
meta.setFirstms(*it, meta.getFirstms(*it)+timeOffset);
meta.setLastms(*it, 0);
}
}
simStartTime = config->getInteger("simulated-starttime");
if (!simStartTime){simStartTime = Util::bootMS();}
std::string reason;
if (config->getBool("realtime")){
realtimeMainLoop();
}else{
streamMainLoop();
}
streamMainLoop();
closeStreamSource();
userSelect.clear();
finish();
INFO_MSG("Input closing clean; reason: %s", Util::exitReason);
return;
@ -855,30 +785,24 @@ namespace Mist{
void Input::streamMainLoop(){
uint64_t statTimer = 0;
uint64_t startTime = Util::bootSecs();
size_t tid;
size_t idx;
Comms::Statistics statComm;
getNext();
tid = thisPacket.getTrackId();
idx = M.trackIDToIndex(tid, getpid());
if (thisPacket && !userSelect.count(idx)){
userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK);
if (thisPacket && !userSelect.count(thisIdx)){
userSelect[thisIdx].reload(streamName, thisIdx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK);
}
while (thisPacket && config->is_active && userSelect[idx]){
if (userSelect[idx].getStatus() & COMM_STATUS_REQDISCONNECT){
while (thisPacket && config->is_active && userSelect[thisIdx]){
if (userSelect[thisIdx].getStatus() & COMM_STATUS_REQDISCONNECT){
Util::logExitReason("buffer requested shutdown");
break;
}
bufferLivePacket(thisPacket);
getNext();
if (!thisPacket){
Util::logExitReason("invalid packet from getNext");
Util::logExitReason("no more data");
break;
}
tid = thisPacket.getTrackId();
idx = M.trackIDToIndex(tid, getpid());
if (thisPacket && !userSelect.count(idx)){
userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK);
if (thisPacket && !userSelect.count(thisIdx)){
userSelect[thisIdx].reload(streamName, thisIdx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK);
}
if (Util::bootSecs() - statTimer > 1){
@ -912,39 +836,147 @@ namespace Mist{
}
void Input::realtimeMainLoop(){
MEDIUM_MSG("Starting real-time main loop!");
uint64_t statTimer = 0;
uint64_t startTime = Util::bootSecs();
size_t idx;
Comms::Statistics statComm;
getNext();
if (thisPacket && !userSelect.count(thisPacket.getTrackId())){
size_t tid = thisPacket.getTrackId();
userSelect[tid].reload(streamName, tid, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK);
DTSC::Meta liveMeta(config->getString("streamname"), false);
DTSC::veryUglyJitterOverride = SIMULATED_LIVE_BUFFER;
uint64_t minFirstMs = 0xFFFFFFFFFFFFFFFFull;
uint64_t maxFirstMs = 0;
uint64_t minLastMs = 0xFFFFFFFFFFFFFFFFull;
uint64_t maxLastMs = 0;
// track lowest firstms value
std::set<size_t> validTracks = M.getValidTracks();
INFO_MSG("VoD metadata has %zu valid tracks", validTracks.size());
if (!validTracks.size()){
FAIL_MSG("No valid tracks! Aborting!");
return;
}
while (thisPacket && config->is_active && userSelect[thisPacket.getTrackId()]){
thisPacket.nullMember("bpos");
while (config->is_active && userSelect[thisPacket.getTrackId()] &&
Util::bootMS() + SIMULATED_LIVE_BUFFER < (thisPacket.getTime() + timeOffset) + simStartTime){
Util::sleep(std::min(((thisPacket.getTime() + timeOffset) + simStartTime) - (Util::getMS() + SIMULATED_LIVE_BUFFER),
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); ++it){
if (M.getFirstms(*it) < minFirstMs){minFirstMs = M.getFirstms(*it);}
if (M.getFirstms(*it) > maxFirstMs){maxFirstMs = M.getFirstms(*it);}
if (M.getLastms(*it) < minLastMs){minLastMs = M.getLastms(*it);}
if (M.getLastms(*it) > maxLastMs){maxLastMs = M.getLastms(*it);}
}
if (maxFirstMs - minFirstMs > 500){
WARN_MSG("Begin timings of tracks for this file are %" PRIu64
" ms apart. This may mess up playback to some degree. (Range: %" PRIu64
"ms - %" PRIu64 "ms)",
maxFirstMs - minFirstMs, minFirstMs, maxFirstMs);
}
if (maxLastMs - minLastMs > 500){
WARN_MSG("Stop timings of tracks for this file are %" PRIu64
" ms apart. This may mess up playback to some degree. (Range: %" PRIu64
"ms - %" PRIu64 "ms)",
maxLastMs - minLastMs, minLastMs, maxLastMs);
}
if (minFirstMs == 0xFFFFFFFFFFFFFFFFull){minFirstMs = 0;}
// find highest current time
int64_t timeOffset = 0;
validTracks = liveMeta.getValidTracks();
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); ++it){
timeOffset = std::max(timeOffset, (int64_t)liveMeta.getLastms(*it));
}
INFO_MSG("Live metadata has %zu valid tracks, last timestamp %" PRIu64, validTracks.size(), timeOffset);
if (timeOffset){
MEDIUM_MSG("Offset is %" PRId64
"ms, adding 40ms and subtracting the start time of %" PRIu64,
timeOffset, minFirstMs);
timeOffset += 40; // Add an artificial frame at 25 FPS to make sure we append, not overwrite
}
timeOffset -= minFirstMs; // we don't need to add the lowest firstms value to the offset, as it's already there
/// This maps local track offsets to stream track offsets
std::map<uint64_t, uint64_t> realTimeTrackMap;
//No time offset and/or no currently valid tracks?
//That means this must be the first entry in this realtime stream. Create the tracks!
if (!timeOffset || !validTracks.size()){
liveMeta.setBootMsOffset(Util::bootMS());
validTracks = M.getValidTracks();
size_t newID = 0;
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); ++it){
size_t newIdx = liveMeta.addTrack();
realTimeTrackMap[*it] = newIdx;
MEDIUM_MSG("Gonna write track %zu to %zu", *it, newIdx);
liveMeta.setID(newIdx, newID++);
liveMeta.setType(newIdx, M.getType(*it));
liveMeta.setCodec(newIdx, M.getCodec(*it));
liveMeta.setFpks(newIdx, M.getFpks(*it));
liveMeta.setInit(newIdx, M.getInit(*it));
liveMeta.setLang(newIdx, M.getLang(*it));
liveMeta.setRate(newIdx, M.getRate(*it));
liveMeta.setSize(newIdx, M.getSize(*it));
liveMeta.setWidth(newIdx, M.getWidth(*it));
liveMeta.setHeight(newIdx, M.getHeight(*it));
}
}else{
validTracks = M.getValidTracks();
std::set<size_t> validLive = liveMeta.getValidTracks();
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); ++it){
for (std::set<size_t>::iterator lit = validLive.begin(); lit != validLive.end(); ++lit){
if (liveMeta.isClaimed(*lit)){continue;}
if (liveMeta.getType(*lit) != M.getType(*it)){continue;}
if (liveMeta.getCodec(*lit) != M.getCodec(*it)){continue;}
if (liveMeta.getInit(*lit) != M.getInit(*it)){continue;}
//Matching type/codec/init! Use it!
realTimeTrackMap[*it] = *lit;
liveMeta.claimTrack(*lit);
MEDIUM_MSG("Gonna write track %zu to existing track %zu", *it, *lit);
break;
}
}
}
int64_t bootMsOffset = liveMeta.getBootMsOffset();
validTracks.clear();
seek(0);/// \TODO Is this actually needed?
while (config->is_active){
getNext();
if (!thisPacket){
Util::logExitReason("no more data");
break;
}
idx = realTimeTrackMap.count(thisIdx) ? realTimeTrackMap[thisIdx] : INVALID_TRACK_ID;
if (thisPacket && !userSelect.count(idx)){
userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK);
}
if (userSelect[idx].getStatus() & COMM_STATUS_REQDISCONNECT){
Util::logExitReason("buffer requested shutdown");
break;
}
while (config->is_active && userSelect[idx] &&
Util::bootMS() + SIMULATED_LIVE_BUFFER < (thisTime + timeOffset) + bootMsOffset){
Util::sleep(std::min(((thisTime + timeOffset) + bootMsOffset) - (Util::getMS() + SIMULATED_LIVE_BUFFER),
(uint64_t)1000));
}
uint64_t originalTime = thisPacket.getTime();
thisPacket.setTime(originalTime + timeOffset);
bufferLivePacket(thisPacket);
thisPacket.setTime(originalTime);
getNext();
if (thisPacket && !userSelect.count(thisPacket.getTrackId())){
size_t tid = thisPacket.getTrackId();
userSelect[tid].reload(streamName, tid, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK);
//Buffer the packet
if (idx == INVALID_TRACK_ID){
INFO_MSG("Packet for track %zu has no valid index!", thisIdx);
}else{
char *data;
size_t dataLen;
thisPacket.getString("data", data, dataLen);
bufferLivePacket(thisTime+timeOffset, thisPacket.getInt("offset"), idx, data, dataLen, 0, thisPacket.getFlag("keyframe"), liveMeta);
}
if (Util::bootSecs() - statTimer > 1){
// Connect to stats for INPUT detection
if (!statComm){statComm.reload();}
if (statComm){
if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){
Util::logExitReason("received shutdown request from controller");
config->is_active = false;
Util::logExitReason("received shutdown request from controller");
return;
}
uint64_t now = Util::bootSecs();
@ -952,21 +984,16 @@ namespace Mist{
statComm.setCRC(getpid());
statComm.setStream(streamName);
statComm.setConnector("INPUT:" + capa["name"].asStringRef());
statComm.setUp(0);
statComm.setDown(streamByteCount());
statComm.setTime(now - startTime);
statComm.setLastSecond(0);
statComm.setHost(getConnectedBinHost());
connStats(statComm);
}
statTimer = Util::bootSecs();
}
}
if (!thisPacket){
Util::logExitReason("invalid packet from getNext");
}
if (thisPacket && !userSelect[thisPacket.getTrackId()]){
Util::logExitReason("buffer shutdown");
for (std::map<uint64_t, uint64_t>::iterator it = realTimeTrackMap.begin(); it != realTimeTrackMap.end(); ++it){
liveMeta.abandonTrack(it->second);
}
}
@ -1249,7 +1276,7 @@ namespace Mist{
pageIdx = i;
}
uint32_t pageNumber = tPages.getInt("firstkey", pageIdx);
if (isBuffered(idx, pageNumber)){
if (isBuffered(idx, pageNumber, meta)){
// Mark the page for removal after 15 seconds of no one watching it
pageCounter[idx][pageNumber] = DEFAULT_PAGE_TIMEOUT;
DONTEVEN_MSG("Track %zu, key %" PRIu32 " is already buffered in page %" PRIu32
@ -1261,7 +1288,7 @@ namespace Mist{
uint64_t bufferTimer = Util::bootMS();
keyNum = pageNumber;
IPC::sharedPage page;
if (!bufferStart(idx, pageNumber, page)){
if (!bufferStart(idx, pageNumber, page, meta)){
WARN_MSG("bufferStart failed! Cancelling bufferFrame");
return false;
}
@ -1288,7 +1315,7 @@ namespace Mist{
}else{
getNext(sourceIdx);
// in case earlier seeking was imprecise, seek to the exact point
while (thisPacket && thisPacket.getTime() < keyTime){getNext(sourceIdx);}
while (thisPacket && thisTime < keyTime){getNext(sourceIdx);}
}
uint64_t lastBuffered = 0;
uint32_t packCounter = 0;
@ -1313,8 +1340,8 @@ namespace Mist{
size_t partNo = 0;
for (size_t i = 0; i < keyNum; ++i){partNo += keys.getParts(i);}
DTSC::Parts parts(M.parts(idx));
while (thisPacket && thisPacket.getTime() < stopTime){
if (thisPacket.getTime() >= lastBuffered){
while (thisPacket && thisTime < stopTime){
if (thisTime >= lastBuffered){
if (sourceIdx != idx){
if (encryption.find(":") != std::string::npos || M.getEncryption(idx).find(":") != std::string::npos){
if (encryption == ""){
@ -1325,11 +1352,11 @@ namespace Mist{
}
if (encryption.substr(0, encryption.find('/')) == "CTR128"){
DTSC::Packet encPacket = aesCipher.encryptPacketCTR(
M, thisPacket, M.getIvec(idx) + M.getPartIndex(thisPacket.getTime(), idx), idx);
M, thisPacket, M.getIvec(idx) + M.getPartIndex(thisTime, idx), idx);
thisPacket = encPacket;
}else if (encryption.substr(0, encryption.find('/')) == "CBC128"){
char ivec[] ={0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
Bit::htobll(ivec + 8, M.getIvec(idx) + M.getPartIndex(thisPacket.getTime(), idx));
Bit::htobll(ivec + 8, M.getIvec(idx) + M.getPartIndex(thisTime, idx));
DTSC::Packet encPacket = aesCipher.encryptPacketCBC(M, thisPacket, ivec, idx);
thisPacket = encPacket;
}
@ -1358,12 +1385,12 @@ namespace Mist{
INFO_MSG("Part size mismatch: %zu != %zu", dataLen, parts.getSize(partNo));
}
++partNo;
HIGH_MSG("Buffering VoD packet (%zuB) @%" PRIu64 " ms on track %zu with offset %" PRIu64, dataLen, thisPacket.getTime(), idx, thisPacket.getInt("offset"));
bufferNext(thisPacket.getTime(), thisPacket.getInt("offset"), idx, data, dataLen,
HIGH_MSG("Buffering VoD packet (%zuB) @%" PRIu64 " ms on track %zu with offset %" PRIu64, dataLen, thisTime, idx, thisPacket.getInt("offset"));
bufferNext(thisTime, thisPacket.getInt("offset"), idx, data, dataLen,
thisPacket.getInt("bpos"), thisPacket.getFlag("keyframe"), page);
++packCounter;
byteCounter += thisPacket.getDataLen();
lastBuffered = thisPacket.getTime();
lastBuffered = thisTime;
}
getNext(sourceIdx);
}
@ -1383,7 +1410,7 @@ namespace Mist{
bufferFinalize(idx, page);
bufferTimer = Util::bootMS() - bufferTimer;
INFO_MSG("Track %zu, page %" PRIu32 " (%" PRIu64 " - %" PRIu64 " ms) buffered in %" PRIu64 "ms",
idx, pageNumber, tPages.getInt("firsttime", pageIdx), thisPacket.getTime(), bufferTimer);
idx, pageNumber, tPages.getInt("firsttime", pageIdx), thisTime, bufferTimer);
INFO_MSG(" (%" PRIu32 "/%" PRIu64 " parts, %" PRIu64 " bytes)", packCounter,
tPages.getInt("parts", pageIdx), byteCounter);
pageCounter[idx][pageNumber] = DEFAULT_PAGE_TIMEOUT;
@ -1392,17 +1419,25 @@ namespace Mist{
bool Input::atKeyFrame(){
static std::map<size_t, uint64_t> lastSeen;
size_t idx = thisPacket.getTrackId();
// not in keyTimes? We're not at a keyframe.
if (!keyTimes[idx].count(thisPacket.getTime())){return false;}
if (!keyTimes[thisIdx].count(thisTime)){return false;}
// skip double times
if (lastSeen.count(idx) && lastSeen[idx] == thisPacket.getTime()){return false;}
if (lastSeen.count(thisIdx) && lastSeen[thisIdx] == thisTime){return false;}
// set last seen, and return true
lastSeen[idx] = thisPacket.getTime();
lastSeen[thisIdx] = thisTime;
return true;
}
bool Input::readExistingHeader(){
if (config->getBool("realtime")){
meta.reInit("", config->getString("input") + ".dtsh");
if (!meta){return false;}
if (meta.version != DTSH_VERSION){
INFO_MSG("Updating wrong version header file from version %u to %u", meta.version, DTSH_VERSION);
return false;
}
return meta;
}
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_META, config->getString("streamname").c_str());
IPC::sharedPage sp(pageName, 0, false, false);

View file

@ -77,7 +77,6 @@ namespace Mist{
JSON::Value capa;
int64_t timeOffset;
std::map<size_t, std::set<uint64_t> > keyTimes;
// Create server for user pages

View file

@ -144,14 +144,14 @@ namespace Mist{
long long packOffset = 0;
bool isKey = false;
if (packTime < 0){packTime = 0;}
size_t idx = meta.trackIDToIndex(packet.stream_index + 1);
size_t idx = meta.trackIDToIndex(packet.stream_index);
if (packet.flags & AV_PKT_FLAG_KEY && M.getType(idx) != "audio"){
isKey = true;
}
if (packet.pts != AV_NOPTS_VALUE && packet.pts != packet.dts){
packOffset = ((packet.pts - packet.dts) * 1000 * strm->time_base.num / strm->time_base.den);
}
meta.update(packTime, packOffset, packet.stream_index + 1, packet.size, packet.pos, isKey);
meta.update(packTime, packOffset, idx, packet.size, packet.pos, isKey);
av_packet_unref(&packet);
}
return true;
@ -161,11 +161,11 @@ namespace Mist{
AVPacket packet;
while (av_read_frame(pFormatCtx, &packet) >= 0){
// filter tracks we don't care about
size_t idx = meta.trackIDToIndex(packet.stream_index + 1);
size_t idx = meta.trackIDToIndex(packet.stream_index);
if (idx == INVALID_TRACK_ID){continue;}
if (wantIdx != INVALID_TRACK_ID && idx != wantIdx){continue;}
if (!userSelect.count(idx)){
HIGH_MSG("Track %u not selected", packet.stream_index + 1);
HIGH_MSG("Track %u not selected", packet.stream_index);
continue;
}
AVStream *strm = pFormatCtx->streams[packet.stream_index];
@ -179,7 +179,9 @@ namespace Mist{
if (packet.pts != AV_NOPTS_VALUE && packet.pts != packet.dts){
packOffset = ((packet.pts - packet.dts) * 1000 * strm->time_base.num / strm->time_base.den);
}
thisPacket.genericFill(packTime, packOffset, packet.stream_index + 1,
thisTime = packTime;
thisIdx = idx;
thisPacket.genericFill(packTime, packOffset, thisIdx,
(const char *)packet.data, packet.size, 0, isKey);
av_packet_unref(&packet);
return; // success!

View file

@ -476,9 +476,13 @@ namespace Mist{
}
void inputBuffer::userOnDisconnect(size_t id){
if (sourcePids.count(id)){
INFO_MSG("Disconnected track %zu", sourcePids[id]);
meta.reloadReplacedPagesIfNeeded();
removeTrack(sourcePids[id]);
if (!resumeMode){
INFO_MSG("Disconnected track %zu", sourcePids[id]);
meta.reloadReplacedPagesIfNeeded();
removeTrack(sourcePids[id]);
}else{
INFO_MSG("Track %zu lost its source, keeping it around for resume", sourcePids[id]);
}
sourcePids.erase(id);
}
}

View file

@ -262,7 +262,7 @@ namespace Mist{
moreHeader = S.getMember("moreheader").asInt();
}else{
moreHeader = 0;
meta.reInit(streamName, moreHeader);
meta.reInit(isSingular() ? streamName : "", S);
}
free(pkt);
@ -329,6 +329,8 @@ namespace Mist{
return;
}
thisPacket.reInit(pBuf.data(), pBuf.size());
thisTime = thisPacket.getTime();
thisIdx = thisPacket.getTrackId();
seekNext(thisPos.seekTime, thisPos.trackID);
fseek(F, thisPos.bytePos, SEEK_SET);
}
@ -361,6 +363,8 @@ namespace Mist{
thisPacket.reInit(srcConn); // read the next packet before continuing
continue; // parse the next packet before returning
}
thisTime = thisPacket.getTime();
thisIdx = thisPacket.getTrackId();
return; // we have a packet
}
}

View file

@ -182,7 +182,7 @@ namespace Mist{
// Create header file from file
uint64_t bench = Util::getMicros();
if (!meta || (needsLock() && isSingular())){
meta.reInit(streamName);
meta.reInit(isSingular() ? streamName : "");
}
while (readElement()){
@ -404,7 +404,7 @@ namespace Mist{
frameSize = assStr.size();
}
if (frameSize){
TP.add(newTime * timeScale, tNum, frameSize, lastClusterBPos, B.isKeyframe() && !isAudio, isVideo);
TP.add(newTime * timeScale, idx, frameSize, lastClusterBPos, B.isKeyframe() && !isAudio, isVideo);
}
}
while (TP.hasPackets()){
@ -481,6 +481,8 @@ namespace Mist{
}
thisPacket.genericFill(C.time, C.offset, C.track, C.ptr, C.dsize,
C.bpos, C.key);
thisTime = C.time;
thisIdx = C.track;
}
void InputEBML::getNext(size_t idx){
@ -533,10 +535,10 @@ namespace Mist{
uint64_t tNum = B.getTrackNum();
uint64_t newTime = lastClusterTime + B.getTimecode();
trackPredictor &TP = packBuf[tNum];
size_t trackIdx = M.trackIDToIndex(tNum, getpid());
bool isVideo = (M.getType(trackIdx) == "video");
bool isAudio = (M.getType(trackIdx) == "audio");
bool isASS = (M.getCodec(trackIdx) == "subtitle" && M.getInit(trackIdx).size());
thisIdx = M.trackIDToIndex(tNum, getpid());
bool isVideo = (M.getType(thisIdx) == "video");
bool isAudio = (M.getType(thisIdx) == "audio");
bool isASS = (M.getCodec(thisIdx) == "subtitle" && M.getInit(thisIdx).size());
// If this is a new video keyframe, flush the corresponding trackPredictor
if (isVideo && B.isKeyframe() && bufferedPacks){
@ -546,7 +548,7 @@ namespace Mist{
fillPacket(C);
TP.remove();
--bufferedPacks;
if (singleTrack && trackIdx != idx){getNext(idx);}
if (singleTrack && thisIdx != idx){getNext(idx);}
return;
}
}
@ -555,19 +557,19 @@ namespace Mist{
for (uint64_t frameNo = 0; frameNo < B.getFrameCount(); ++frameNo){
if (frameNo){
if (M.getCodec(trackIdx) == "AAC"){
newTime += (1000000 / M.getRate(trackIdx)) / timeScale; // assume ~1000 samples per frame
}else if (M.getCodec(trackIdx) == "MP3"){
newTime += (1152000 / M.getRate(trackIdx)) / timeScale; // 1152 samples per frame
}else if (M.getCodec(trackIdx) == "DTS"){
if (M.getCodec(thisIdx) == "AAC"){
newTime += (1000000 / M.getRate(thisIdx)) / timeScale; // assume ~1000 samples per frame
}else if (M.getCodec(thisIdx) == "MP3"){
newTime += (1152000 / M.getRate(thisIdx)) / timeScale; // 1152 samples per frame
}else if (M.getCodec(thisIdx) == "DTS"){
// Assume 512 samples per frame (DVD default)
// actual amount can be calculated from data, but data
// is not available during header generation...
// See: http://www.stnsoft.com/DVD/dtshdr.html
newTime += (512000 / M.getRate(trackIdx)) / timeScale;
newTime += (512000 / M.getRate(thisIdx)) / timeScale;
}else{
ERROR_MSG("Unknown frame duration for codec %s - timestamps WILL be wrong!",
M.getCodec(trackIdx).c_str());
M.getCodec(thisIdx).c_str());
}
}
uint32_t frameSize = B.getFrameSize(frameNo);
@ -579,7 +581,7 @@ namespace Mist{
memcpy(ptr, assStr.data(), frameSize);
}
if (frameSize){
TP.add(newTime * timeScale, tNum, frameSize, lastClusterBPos,
TP.add(newTime * timeScale, thisIdx, frameSize, lastClusterBPos,
B.isKeyframe() && !isAudio, isVideo, (void *)ptr);
++bufferedPacks;
}
@ -590,7 +592,7 @@ namespace Mist{
fillPacket(C);
TP.remove();
--bufferedPacks;
if (singleTrack && trackIdx != idx){getNext(idx);}
if (singleTrack && thisIdx != idx){getNext(idx);}
}else{
// We didn't set thisPacket yet. Read another.
// Recursing is fine, this can only happen a few times in a row.

View file

@ -150,7 +150,7 @@ namespace Mist{
bool readExistingHeader();
void parseStreamHeader(){readHeader();}
bool openStreamSource(){return true;}
bool needHeader(){return needsLock() && !readExistingHeader();}
bool needHeader(){return (config->getBool("realtime") || needsLock()) && !readExistingHeader();}
double timeScale;
bool wantBlocks;
size_t totalBytes;

View file

@ -79,7 +79,8 @@ namespace Mist{
bool inputFLV::readHeader(){
if (!inFile){return false;}
meta.reInit(config->getString("streamname"));
if (readExistingHeader()){return true;}
meta.reInit(isSingular() ? streamName : "");
// Create header file from FLV data
Util::fseek(inFile, 13, SEEK_SET);
AMF::Object amf_storage;
@ -142,8 +143,9 @@ namespace Mist{
if (!tmpTag.getDataLen() || (tmpTag.needsInitData() && tmpTag.isInitData())){
return getNext(idx);
}
size_t tNumber = meta.trackIDToIndex(tmpTag.getTrackID(), getpid());
thisPacket.genericFill(tmpTag.tagTime(), tmpTag.offset(), tNumber, tmpTag.getData(),
thisIdx = meta.trackIDToIndex(tmpTag.getTrackID(), getpid());
thisTime = tmpTag.tagTime();
thisPacket.genericFill(thisTime, tmpTag.offset(), thisIdx, tmpTag.getData(),
tmpTag.getDataLen(), lastBytePos, tmpTag.isKeyframe);
if (M.getCodec(idx) == "PCM" && M.getSize(idx) == 16){

View file

@ -122,6 +122,8 @@ namespace Mist{
if (M.getFpks(tNumber)){ts = frameCount * (1000000 / M.getFpks(tNumber));}
thisPacket.genericFill(ts, 0, tNumber, 0, 0, 0, h264::isKeyframe(NAL.data(), nalSize));
thisPacket.appendNal(NAL.data(), nalSize);
thisTime = ts;
thisIdx = tNumber;
++frameCount;
return;
}

View file

@ -629,7 +629,7 @@ namespace Mist{
FAIL_MSG("Failed to load HLS playlist, aborting");
return;
}
meta.reInit(config->getString("streamname"), false);
meta.reInit(isSingular() ? streamName : "", false);
INFO_MSG("Parsing live stream to create header...");
TS::Packet packet; // to analyse and extract data
int pidCounter = 1;
@ -758,7 +758,7 @@ namespace Mist{
char *data;
size_t dataLen;
bool hasPacket = false;
meta.reInit(config->getString("streamname"), true);
meta.reInit(isSingular() ? streamName : "");
tthread::lock_guard<tthread::mutex> guard(entryMutex);
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin();
@ -832,7 +832,6 @@ namespace Mist{
// Finally save the offset as part of the TS segment. This is required for bufferframe
// to work correctly, since not every segment might have an UTC timestamp tag
std::deque<playListEntries> &curList = listEntries[pListIt->first];
INFO_MSG("Saving offset of '%" PRId64 "' to current TS segment", plsTimeOffset[pListIt->first]);
curList.at(entId-1).timeOffset = plsTimeOffset[pListIt->first];
}
}
@ -852,6 +851,7 @@ namespace Mist{
}
bool inputHLS::needsLock(){
if (config->getBool("realtime")){return false;}
if (isLiveDVR){
return true;
}
@ -1013,6 +1013,8 @@ namespace Mist{
// overwrite trackId on success
Bit::htobl(thisPacket.getData() + 8, tid);
Bit::htobll(thisPacket.getData() + 12, packetTime);
thisTime = packetTime;
thisIdx = tid;
return; // Success!
}
@ -1071,6 +1073,7 @@ namespace Mist{
// Note: bpos is overloaded here for playlist entry!
void inputHLS::seek(uint64_t seekTime, size_t idx){
if (idx == INVALID_TRACK_ID){return;}
plsTimeOffset.clear();
plsLastTime.clear();
plsInterval.clear();

View file

@ -108,6 +108,8 @@ namespace Mist{
thisPacket.genericFill(thisPos.time / 10000, thisPos.offset / 10000, thisPos.trackId,
dataPointer, thisPos.size, 0, thisPos.isKeyFrame);
thisTime = thisPos.time/1000;
thisIdx = thisPos.trackId;
if (buffered.size() < 2 * (idx == INVALID_TRACK_ID ? M.getValidTracks().size() : 1)){
std::set<size_t> validTracks = M.getValidTracks();

View file

@ -51,7 +51,7 @@ namespace Mist{
bool inputMP3::readHeader(){
if (!inFile){return false;}
meta.reInit(config->getString("streamname"));
meta.reInit(isSingular() ? streamName : "");
size_t tNum = meta.addTrack();
meta.setID(tNum, tNum);
meta.setType(tNum, "audio");
@ -142,13 +142,16 @@ namespace Mist{
fseek(inFile, filePos + dataSize, SEEK_SET);
// Create a json value with the right data
thisPacket.genericFill(timestamp, 0, idx, packHeader, dataSize, filePos, false);
thisPacket.genericFill(timestamp, 0, 0, packHeader, dataSize, filePos, false);
thisTime = timestamp;
thisIdx = 0;
// Update the internal timestamp
timestamp += (sampleCount / (sampleRate / 1000));
}
void inputMP3::seek(uint64_t seekTime, size_t idx){
idx = 0;
DTSC::Keys keys(M.keys(idx));
uint32_t keyNum = M.getKeyNumForTime(idx, seekTime);
fseek(inFile, keys.getBpos(keyNum), SEEK_SET);

View file

@ -209,6 +209,7 @@ namespace Mist{
INFO_MSG("inFile failed!");
return false;
}
if (readExistingHeader()){return true;}
// first we get the necessary header parts
size_t tNumber = 0;
@ -238,7 +239,7 @@ namespace Mist{
if (readExistingHeader()){return true;}
HIGH_MSG("Not read existing header");
meta.reInit(streamName);
meta.reInit(isSingular() ? streamName : "");
tNumber = 0;
// Create header file from MP4 data
@ -508,6 +509,8 @@ namespace Mist{
}else{
thisPacket.genericFill(curPart.time, curPart.offset, curPart.trackID, data, curPart.size, 0, isKeyframe);
}
thisTime = curPart.time;
thisIdx = curPart.trackID;
// get the next part for this track
curPart.index++;

View file

@ -41,8 +41,14 @@ namespace Mist{
void inputPlaylist::streamMainLoop(){
bool seenValidEntry = true;
uint64_t startTime = Util::bootMS();
Comms::Users killSwitch;
killSwitch.reload(streamName, (size_t)INVALID_TRACK_ID, (uint8_t)(COMM_STATUS_ACTIVE | COMM_STATUS_DONOTTRACK));
while (config->is_active){
if (killSwitch && killSwitch.getStatus() & COMM_STATUS_REQDISCONNECT){
Util::logExitReason("buffer requested shutdown");
config->is_active = false;
break;
}
struct tm *wTime;
time_t nowTime = time(0);
wTime = localtime(&nowTime);
@ -77,7 +83,6 @@ namespace Mist{
std::map<std::string, std::string> overrides;
overrides["realtime"] = "1";
overrides["alwaysStart"] = ""; // Just making this value "available" is enough
overrides["simulated-starttime"] = JSON::Value(startTime).asString();
std::string srcPath = config->getString("input");
if ((currentSource.size() && currentSource[0] == '/') || srcPath.rfind('/') == std::string::npos){
srcPath = currentSource;
@ -106,6 +111,11 @@ namespace Mist{
}
seenValidEntry = true;
while (Util::Procs::isRunning(spawn_pid) && config->is_active){
if (killSwitch && killSwitch.getStatus() & COMM_STATUS_REQDISCONNECT){
Util::logExitReason("buffer requested shutdown");
config->is_active = false;
break;
}
Util::sleep(1000);
if (reloadOn != 0xFFFF){
time_t nowTime = time(0);

View file

@ -344,7 +344,7 @@ namespace Mist{
///\todo Find errors, perhaps parts can be made more modular
bool inputTS::readHeader(){
if (!inFile){return false;}
meta.reInit(streamName);
meta.reInit(isSingular() ? streamName : "");
TS::Packet packet; // to analyse and extract data
DTSC::Packet headerPack;
fseek(inFile, 0, SEEK_SET); // seek to beginning
@ -430,7 +430,8 @@ namespace Mist{
return;
}
tsStream.initializeMetadata(meta);
size_t thisIdx = M.trackIDToIndex(thisPacket.getTrackId(), getpid());
thisIdx = M.trackIDToIndex(thisPacket.getTrackId(), getpid());
thisTime = thisPacket.getTime();
if (thisIdx == INVALID_TRACK_ID){getNext(idx);}
}

View file

@ -37,15 +37,15 @@ namespace Mist{
/// Buffering itself is done by bufferNext().
///\param tid The trackid of the page to start buffering
///\param pageNumber The number of the page to start buffering
bool InOutBase::bufferStart(size_t idx, uint32_t pageNumber, IPC::sharedPage & page){
bool InOutBase::bufferStart(size_t idx, uint32_t pageNumber, IPC::sharedPage & page, DTSC::Meta & aMeta){
VERYHIGH_MSG("bufferStart for stream %s, track %zu, page %" PRIu32, streamName.c_str(), idx, pageNumber);
// Initialize the stream metadata if it does not yet exist
#ifndef TSLIVE_INPUT
if (!meta){meta.reInit(streamName);}
if (!aMeta){aMeta.reInit(streamName);}
#endif
if (!meta.getValidTracks().size()){
meta.clear();
if (!aMeta.getValidTracks().size()){
aMeta.clear();
return false;
}
@ -56,7 +56,7 @@ namespace Mist{
page.close();
}
Util::RelAccX &tPages = meta.pages(idx);
Util::RelAccX &tPages = aMeta.pages(idx);
uint32_t pageIdx = INVALID_KEY_NUM;
for (uint32_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
@ -79,7 +79,7 @@ namespace Mist{
}
// If the page is already buffered, ignore this request
if (isBuffered(idx, pageNumber)){
if (isBuffered(idx, pageNumber, aMeta)){
INFO_MSG("Page %" PRIu32 " on track %zu already buffered", pageNumber, idx);
///\return false if the page was already buffered.
return false;
@ -171,16 +171,16 @@ namespace Mist{
/// Checks whether a key is buffered
///\param tid The trackid on which to locate the key
///\param keyNum The number of the keyframe to find
bool InOutBase::isBuffered(size_t idx, uint32_t keyNum){
bool InOutBase::isBuffered(size_t idx, uint32_t keyNum, DTSC::Meta & aMeta){
///\return The result of bufferedOnPage(tid, keyNum)
return bufferedOnPage(idx, keyNum) != INVALID_KEY_NUM;
return bufferedOnPage(idx, keyNum, aMeta) != INVALID_KEY_NUM;
}
/// Returns the pagenumber where this key is buffered on
///\param tid The trackid on which to locate the key
///\param keyNum The number of the keyframe to find
uint32_t InOutBase::bufferedOnPage(size_t idx, uint32_t keyNum){
Util::RelAccX &tPages = meta.pages(idx);
uint32_t InOutBase::bufferedOnPage(size_t idx, uint32_t keyNum, DTSC::Meta & aMeta){
Util::RelAccX &tPages = aMeta.pages(idx);
for (uint64_t i = tPages.getDeleted(); i < tPages.getEndPos(); i++){
uint64_t pageNum = tPages.getInt("firstkey", i);
@ -198,6 +198,13 @@ namespace Mist{
///\param pack The packet to buffer
void InOutBase::bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData,
size_t packDataSize, uint64_t packBytePos, bool isKeyframe, IPC::sharedPage & page){
bufferNext(packTime, packOffset, packTrack, packData, packDataSize, packBytePos, isKeyframe, page, meta);
}
/// Buffers the next packet on the currently opened page
///\param pack The packet to buffer
void InOutBase::bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData,
size_t packDataSize, uint64_t packBytePos, bool isKeyframe, IPC::sharedPage & page, DTSC::Meta & aMeta){
size_t packDataLen =
24 + (packOffset ? 17 : 0) + (packBytePos ? 15 : 0) + (isKeyframe ? 19 : 0) + packDataSize + 11;
@ -209,10 +216,10 @@ namespace Mist{
}
// these checks were already done in bufferSinglePacket, but we check again just to be sure
if (!meta.getVod() && packTime < meta.getLastms(packTrack)){
if (!aMeta.getVod() && packTime < aMeta.getLastms(packTrack)){
DEBUG_MSG(((multiWrong == 0) ? DLVL_WARN : DLVL_HIGH),
"Wrong order on track %" PRIu32 " ignored: %" PRIu64 " < %" PRIu64, packTrack,
packTime, meta.getLastms(packTrack));
packTime, aMeta.getLastms(packTrack));
multiWrong = true;
return;
}
@ -223,7 +230,7 @@ namespace Mist{
}
multiWrong = false;
Util::RelAccX &tPages = meta.pages(packTrack);
Util::RelAccX &tPages = aMeta.pages(packTrack);
uint32_t pageIdx = 0;
uint32_t currPagNum = atoi(page.name.data() + page.name.rfind('_') + 1);
Util::RelAccXFieldData firstkey = tPages.getFieldData("firstkey");
@ -286,9 +293,7 @@ namespace Mist{
}
/// Wraps up the buffering of a shared memory data page
///
/// Registers the data page on the track index page as well
///\param tid The trackid of the page to finalize
/// \param idx The track index of the page to finalize
void InOutBase::bufferFinalize(size_t idx, IPC::sharedPage & page){
// If no page is open, do nothing
if (!page){
@ -318,11 +323,7 @@ namespace Mist{
}
/// Buffers a live packet to a page.
///
/// Handles both buffering and creation of new pages
///
/// Initiates/continues negotiation with the buffer as well
///\param packet The packet to buffer
/// Calls bufferLivePacket with full arguments internally.
void InOutBase::bufferLivePacket(const DTSC::Packet &packet){
size_t idx = M.trackIDToIndex(packet.getTrackId(), getpid());
if (idx == INVALID_TRACK_ID){
@ -337,19 +338,28 @@ namespace Mist{
/// \TODO META Build something that should actually be able to deal with "extra" values
}
/// Calls bufferLivePacket with additional argument for internal metadata reference internally.
void InOutBase::bufferLivePacket(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData,
size_t packDataSize, uint64_t packBytePos, bool isKeyframe){
meta.reloadReplacedPagesIfNeeded();
meta.setLive(true);
bufferLivePacket(packTime, packOffset, packTrack, packData, packDataSize, packBytePos, isKeyframe, meta);
}
///Buffers the given packet data into the given metadata structure.
///Uses class member variables livePage and curPageNum internally for bookkeeping.
///These member variables are not (and should not, in the future) be accessed anywhere else.
void InOutBase::bufferLivePacket(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData,
size_t packDataSize, uint64_t packBytePos, bool isKeyframe, DTSC::Meta &aMeta){
aMeta.reloadReplacedPagesIfNeeded();
aMeta.setLive(true);
// Store the trackid for easier access
// Do nothing if the trackid is invalid
if (packTrack == INVALID_TRACK_ID){return;}
// Store the trackid for easier access
Util::RelAccX &tPages = meta.pages(packTrack);
Util::RelAccX &tPages = aMeta.pages(packTrack);
if (M.getType(packTrack) != "video"){
if (aMeta.getType(packTrack) != "video"){
isKeyframe = false;
if (!tPages.getEndPos() || !livePage[packTrack]){
// Assume this is the first packet on the track
@ -363,20 +373,20 @@ namespace Mist{
// For live streams, ignore packets that make no sense
// This also happens in bufferNext, with the same rules
if (M.getLive()){
if (packTime < M.getLastms(packTrack)){
if (aMeta.getLive()){
if (packTime < aMeta.getLastms(packTrack)){
HIGH_MSG("Wrong order on track %" PRIu32 " ignored: %" PRIu64 " < %" PRIu64, packTrack,
packTime, M.getLastms(packTrack));
packTime, aMeta.getLastms(packTrack));
return;
}
if (packTime > M.getLastms(packTrack) + 30000 && M.getLastms(packTrack)){
WARN_MSG("Sudden jump in timestamp from %" PRIu64 " to %" PRIu64, M.getLastms(packTrack), packTime);
if (packTime > aMeta.getLastms(packTrack) + 30000 && aMeta.getLastms(packTrack)){
WARN_MSG("Sudden jump in timestamp from %" PRIu64 " to %" PRIu64, aMeta.getLastms(packTrack), packTime);
}
}
// Determine if we need to open the next page
if (isKeyframe){
updateTrackFromKeyframe(packTrack, packData, packDataSize);
updateTrackFromKeyframe(packTrack, packData, packDataSize, aMeta);
uint64_t endPage = tPages.getEndPos();
size_t curPage = 0;
size_t currPagNum = atoi(livePage[packTrack].name.data() + livePage[packTrack].name.rfind('_') + 1);
@ -390,15 +400,15 @@ namespace Mist{
// If there is no page, create it
if (!livePage[packTrack]){
size_t keyNum = M.getKeyNumForTime(packTrack, packTime);
size_t keyNum = aMeta.getKeyNumForTime(packTrack, packTime);
if (keyNum == INVALID_KEY_NUM){
curPageNum[packTrack] = 0;
}else{
curPageNum[packTrack] = M.getKeyNumForTime(packTrack, packTime) + 1;
curPageNum[packTrack] = aMeta.getKeyNumForTime(packTrack, packTime) + 1;
}
if ((tPages.getEndPos() - tPages.getDeleted()) >= tPages.getRCount()){
meta.resizeTrack(packTrack, M.fragments(packTrack).getRCount(), M.keys(packTrack).getRCount(), M.parts(packTrack).getRCount(), tPages.getRCount() * 2, "not enough pages");
aMeta.resizeTrack(packTrack, aMeta.fragments(packTrack).getRCount(), aMeta.keys(packTrack).getRCount(), aMeta.parts(packTrack).getRCount(), tPages.getRCount() * 2, "not enough pages");
}
tPages.addRecords(1);
@ -409,7 +419,7 @@ namespace Mist{
tPages.setInt("avail", 0, endPage);
curPage = endPage;
DONTEVEN_MSG("Opening new page #%zu to track %" PRIu32, curPageNum[packTrack], packTrack);
if (!bufferStart(packTrack, curPageNum[packTrack], livePage[packTrack])){
if (!bufferStart(packTrack, curPageNum[packTrack], livePage[packTrack], aMeta)){
// if this fails, return instantly without actually buffering the packet
WARN_MSG("Dropping packet %s:%" PRIu32 "@%" PRIu64, streamName.c_str(), packTrack, packTime);
return;
@ -424,7 +434,7 @@ namespace Mist{
tPages.getInt("firstkey", curPage), packTrack, curPageNum[packTrack]);
if ((tPages.getEndPos() - tPages.getDeleted()) >= tPages.getRCount()){
meta.resizeTrack(packTrack, M.fragments(packTrack).getRCount(), M.keys(packTrack).getRCount(), M.parts(packTrack).getRCount(), tPages.getRCount() * 2, "not enough pages");
aMeta.resizeTrack(packTrack, aMeta.fragments(packTrack).getRCount(), aMeta.keys(packTrack).getRCount(), aMeta.parts(packTrack).getRCount(), tPages.getRCount() * 2, "not enough pages");
}
tPages.addRecords(1);
@ -436,7 +446,7 @@ namespace Mist{
curPage = endPage;
if (livePage[packTrack]){bufferFinalize(packTrack, livePage[packTrack]);}
DONTEVEN_MSG("Opening new page #%zu to track %" PRIu32, curPageNum[packTrack], packTrack);
if (!bufferStart(packTrack, curPageNum[packTrack], livePage[packTrack])){
if (!bufferStart(packTrack, curPageNum[packTrack], livePage[packTrack], aMeta)){
// if this fails, return instantly without actually buffering the packet
WARN_MSG("Dropping packet %s:%" PRIu32 "@%" PRIu64, streamName.c_str(), packTrack, packTime);
return;
@ -462,13 +472,13 @@ namespace Mist{
// Buffer the packet
DONTEVEN_MSG("Buffering live packet (%zuB) @%" PRIu64 " ms on track %" PRIu32 " with offset %" PRIu64, packDataSize, packTime, packTrack, packOffset);
bufferNext(packTime, packOffset, packTrack, packData, packDataSize, packBytePos, isKeyframe, livePage[packTrack]);
meta.update(packTime, packOffset, packTrack, packDataSize, packBytePos, isKeyframe);
bufferNext(packTime, packOffset, packTrack, packData, packDataSize, packBytePos, isKeyframe, livePage[packTrack], aMeta);
aMeta.update(packTime, packOffset, packTrack, packDataSize, packBytePos, isKeyframe);
}
///Handles updating track metadata from a new keyframe, if applicable
void InOutBase::updateTrackFromKeyframe(uint32_t packTrack, const char *packData, size_t packDataSize){
if (meta.getCodec(packTrack) == "H264"){
void InOutBase::updateTrackFromKeyframe(uint32_t packTrack, const char *packData, size_t packDataSize, DTSC::Meta & aMeta){
if (aMeta.getCodec(packTrack) == "H264"){
//H264 packets are 4-byte size-prepended NAL units
size_t offset = 0;
while (offset+4 < packDataSize){
@ -480,14 +490,14 @@ namespace Mist{
uint8_t nalType = (packData[offset+4] & 0x1F);
if (nalType == 7){//SPS, update width/height/FPS
h264::SPSMeta hMeta = h264::sequenceParameterSet(packData+offset+4, nalLen).getCharacteristics();
meta.setWidth(packTrack, hMeta.width);
meta.setHeight(packTrack, hMeta.height);
meta.setFpks(packTrack, hMeta.fps*1000);
aMeta.setWidth(packTrack, hMeta.width);
aMeta.setHeight(packTrack, hMeta.height);
aMeta.setFpks(packTrack, hMeta.fps*1000);
}
offset += nalLen+4;
}
}
if (meta.getCodec(packTrack) == "VP8"){
if (aMeta.getCodec(packTrack) == "VP8"){
//VP8 packets have a simple header for keyframes
//Reference: https://www.rfc-editor.org/rfc/rfc6386.html#section-9.1
if (packData[3] == 0x9d && packData[4] == 0x01 && packData[5] == 0x2a){
@ -506,8 +516,8 @@ namespace Mist{
case 2: h *= 5/3; break;
case 3: h *= 2; break;
}
meta.setWidth(packTrack, w);
meta.setHeight(packTrack, h);
aMeta.setWidth(packTrack, w);
aMeta.setHeight(packTrack, h);
}
}
}

View file

@ -14,27 +14,33 @@ namespace Mist{
public:
InOutBase();
bool isBuffered(size_t idx, uint32_t keyNum);
uint32_t bufferedOnPage(size_t idx, uint32_t keyNum);
bool isBuffered(size_t idx, uint32_t keyNum, DTSC::Meta & aMeta);
uint32_t bufferedOnPage(size_t idx, uint32_t keyNum, DTSC::Meta & aMeta);
size_t getMainSelectedTrack();
bool bufferStart(size_t idx, uint32_t pageNumber, IPC::sharedPage & page);
bool bufferStart(size_t idx, uint32_t pageNumber, IPC::sharedPage & page, DTSC::Meta & aMeta);
void bufferFinalize(size_t idx, IPC::sharedPage & page);
bool isCurrentLivePage(size_t idx, uint32_t pageNumber);
void bufferRemove(size_t idx, uint32_t pageNumber);
void bufferLivePacket(const DTSC::Packet &packet);
void bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData,
size_t packDataSize, uint64_t packBytePos, bool isKeyframe, IPC::sharedPage & page, DTSC::Meta & aMeta);
void bufferNext(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData,
size_t packDataSize, uint64_t packBytePos, bool isKeyframe, IPC::sharedPage & page);
void bufferLivePacket(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData,
size_t packDataSize, uint64_t packBytePos, bool isKeyframe);
void bufferLivePacket(uint64_t packTime, int64_t packOffset, uint32_t packTrack, const char *packData,
size_t packDataSize, uint64_t packBytePos, bool isKeyframe, DTSC::Meta & aMeta);
protected:
void updateTrackFromKeyframe(uint32_t packTrack, const char *packData, size_t packDataSize);
void updateTrackFromKeyframe(uint32_t packTrack, const char *packData, size_t packDataSize, DTSC::Meta & aMeta);
bool standAlone;
DTSC::Packet thisPacket; // The current packet that is being parsed
size_t thisIdx; //Track index of current packet
uint64_t thisTime; //Time of current packet
std::string streamName;

View file

@ -155,9 +155,6 @@ namespace Mist{
uint64_t firstPacketTime;
uint64_t lastPacketTime;
size_t thisIdx;
uint64_t thisTime;
std::map<size_t, IPC::sharedPage> curPage; ///< For each track, holds the page that is currently being written.
};