Initial TS Input commit

This commit is contained in:
Erik Zandvliet 2015-04-07 14:00:08 +02:00 committed by Thulinma
parent 10f0f6bb92
commit 1f4b523b1b
33 changed files with 1300 additions and 643 deletions

View file

@ -113,6 +113,7 @@ add_definitions(-g -funsigned-char -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64 -D
# MistLib - Header Files # # MistLib - Header Files #
######################################## ########################################
set(libHeaders set(libHeaders
${SOURCE_DIR}/lib/adts.h
${SOURCE_DIR}/lib/amf.h ${SOURCE_DIR}/lib/amf.h
${SOURCE_DIR}/lib/auth.h ${SOURCE_DIR}/lib/auth.h
${SOURCE_DIR}/lib/base64.h ${SOURCE_DIR}/lib/base64.h
@ -124,6 +125,7 @@ set(libHeaders
${SOURCE_DIR}/lib/dtsc.h ${SOURCE_DIR}/lib/dtsc.h
${SOURCE_DIR}/lib/encryption.h ${SOURCE_DIR}/lib/encryption.h
${SOURCE_DIR}/lib/flv_tag.h ${SOURCE_DIR}/lib/flv_tag.h
${SOURCE_DIR}/lib/h264.h
${SOURCE_DIR}/lib/http_parser.h ${SOURCE_DIR}/lib/http_parser.h
${SOURCE_DIR}/lib/json.h ${SOURCE_DIR}/lib/json.h
${SOURCE_DIR}/lib/mp4_adobe.h ${SOURCE_DIR}/lib/mp4_adobe.h
@ -146,6 +148,7 @@ set(libHeaders
${SOURCE_DIR}/lib/timing.h ${SOURCE_DIR}/lib/timing.h
${SOURCE_DIR}/lib/tinythread.h ${SOURCE_DIR}/lib/tinythread.h
${SOURCE_DIR}/lib/ts_packet.h ${SOURCE_DIR}/lib/ts_packet.h
${SOURCE_DIR}/lib/ts_stream.h
${SOURCE_DIR}/lib/vorbis.h ${SOURCE_DIR}/lib/vorbis.h
) )
@ -153,6 +156,7 @@ set(libHeaders
# MistLib - Source Files # # MistLib - Source Files #
######################################## ########################################
set(libSources set(libSources
${SOURCE_DIR}/lib/adts.cpp
${SOURCE_DIR}/lib/amf.cpp ${SOURCE_DIR}/lib/amf.cpp
${SOURCE_DIR}/lib/auth.cpp ${SOURCE_DIR}/lib/auth.cpp
${SOURCE_DIR}/lib/base64.cpp ${SOURCE_DIR}/lib/base64.cpp
@ -163,6 +167,7 @@ set(libSources
${SOURCE_DIR}/lib/dtscmeta.cpp ${SOURCE_DIR}/lib/dtscmeta.cpp
${SOURCE_DIR}/lib/encryption.cpp ${SOURCE_DIR}/lib/encryption.cpp
${SOURCE_DIR}/lib/flv_tag.cpp ${SOURCE_DIR}/lib/flv_tag.cpp
${SOURCE_DIR}/lib/h264.cpp
${SOURCE_DIR}/lib/http_parser.cpp ${SOURCE_DIR}/lib/http_parser.cpp
${SOURCE_DIR}/lib/json.cpp ${SOURCE_DIR}/lib/json.cpp
${SOURCE_DIR}/lib/mp4_adobe.cpp ${SOURCE_DIR}/lib/mp4_adobe.cpp
@ -184,6 +189,7 @@ set(libSources
${SOURCE_DIR}/lib/timing.cpp ${SOURCE_DIR}/lib/timing.cpp
${SOURCE_DIR}/lib/tinythread.cpp ${SOURCE_DIR}/lib/tinythread.cpp
${SOURCE_DIR}/lib/ts_packet.cpp ${SOURCE_DIR}/lib/ts_packet.cpp
${SOURCE_DIR}/lib/ts_stream.cpp
${SOURCE_DIR}/lib/vorbis.cpp ${SOURCE_DIR}/lib/vorbis.cpp
) )
######################################## ########################################
@ -243,6 +249,8 @@ makeAnalyser(MP4 mp4)
makeAnalyser(OGG ogg) makeAnalyser(OGG ogg)
makeAnalyser(RTP rtp) #LTS makeAnalyser(RTP rtp) #LTS
makeAnalyser(RTSP rtsp_rtp) #LTS makeAnalyser(RTSP rtsp_rtp) #LTS
makeAnalyser(TS ts) #LTS
makeAnalyser(TSStream tsstream) #LTS
makeAnalyser(Stats stats) #LTS makeAnalyser(Stats stats) #LTS
######################################## ########################################
@ -260,9 +268,21 @@ macro(makeInput inputName format)
src/input/input_${format}.cpp src/input/input_${format}.cpp
src/io.cpp src/io.cpp
) )
#Set compile definitions
unset(my_definitions)
if (";${ARGN};" MATCHES ";nolock;")#Currently only used in TSStream
list(APPEND my_definitions "INPUT_NOLOCK")
endif()
if (";${ARGN};" MATCHES ";live;")#Currently only used in TSStream
list(APPEND my_definitions "INPUT_LIVE")
endif()
list(APPEND my_definitions "INPUTTYPE=\"input_${format}.h\"")
set_target_properties(MistIn${inputName} set_target_properties(MistIn${inputName}
PROPERTIES COMPILE_DEFINITIONS INPUTTYPE=\"input_${format}.h\" PROPERTIES COMPILE_DEFINITIONS "${my_definitions}"
) )
target_link_libraries(MistIn${inputName} target_link_libraries(MistIn${inputName}
mist mist
) )
@ -281,6 +301,7 @@ makeInput(Buffer buffer)
makeInput(ISMV ismv)#LTS makeInput(ISMV ismv)#LTS
makeInput(MP4 mp4)#LTS makeInput(MP4 mp4)#LTS
makeInput(TS ts)#LTS makeInput(TS ts)#LTS
makeInput(TSStream ts nolock live)#LTS
makeInput(Folder folder folder)#LTS makeInput(Folder folder folder)#LTS
######################################## ########################################

View file

@ -152,6 +152,12 @@ MistInTS: override CPPFLAGS += "-DINPUTTYPE=\"input_ts.h\""
MistInTS: src/input/mist_in.cpp src/input/input.cpp src/input/input_ts.cpp src/io.cpp MistInTS: src/input/mist_in.cpp src/input/input.cpp src/input/input_ts.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@ $(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
inputs: MistInTSLive
MistInTSLive: override LDLIBS += $(THREADLIB)
MistInTSLive: override CPPFLAGS += "-DINPUTTYPE=\"input_tslive.h\""
MistInTSLive: src/input/mist_in.cpp src/input/input.cpp src/input/input_tslive.cpp src/io.cpp
$(CXX) $(LDFLAGS) $(CPPFLAGS) $^ $(LDLIBS) -o $@
inputs: MistInMP4 inputs: MistInMP4
MistInMP4: override LDLIBS += $(THREADLIB) MistInMP4: override LDLIBS += $(THREADLIB)
MistInMP4: override CPPFLAGS += "-DINPUTTYPE=\"input_mp4.h\"" MistInMP4: override CPPFLAGS += "-DINPUTTYPE=\"input_mp4.h\""

123
lib/adts.cpp Normal file
View file

@ -0,0 +1,123 @@
#include "adts.h"
#include <cstdlib>
#include <cstring>
#include <sstream>
#include "defines.h"
namespace aac {
adts::adts(){
data = NULL;
len = 0;
}
adts::adts(char * _data, unsigned long _len){
len = _len;
data = (char*)malloc(len);
memcpy(data, _data, len);
}
adts::adts(const adts & rhs){
*this = rhs;
}
adts& adts::operator = (const adts & rhs){
len = rhs.len;
data = (char*)malloc(len);
memcpy(data, rhs.data, len);
return * this;
}
adts::~adts(){
if (data){
free(data);
}
}
unsigned long adts::getAACProfile(){
if (!data || !len){
return 0;
}
return ((data[2] >> 6) & 0x03) + 1;
}
unsigned long adts::getFrequencyIndex(){
if (!data || !len){
return 0;
}
return ((data[2] >> 2) & 0x0F);
}
unsigned long adts::getFrequency(){
if (!data || !len){
return 0;
}
switch(getFrequencyIndex()){
case 0: return 96000; break;
case 1: return 88200; break;
case 2: return 64000; break;
case 3: return 48000; break;
case 4: return 44100; break;
case 5: return 32000; break;
case 6: return 24000; break;
case 7: return 22050; break;
case 8: return 16000; break;
case 9: return 12000; break;
case 10: return 11025; break;
case 11: return 8000; break;
case 12: return 7350; break;
default: return 0; break;
}
}
unsigned long adts::getChannelConfig(){
if (!data || !len){
return 0;
}
return ((data[2] & 0x01) << 2) | ((data[3] >> 6) & 0x03);
}
unsigned long adts::getChannelCount(){
if (!data || !len){
return 0;
}
return (getChannelConfig() == 7 ? 8 : getChannelConfig());
}
unsigned long adts::getHeaderSize(){
if (!data || !len){
return 0;
}
return (data[1] & 0x01 ? 7 : 9);
}
unsigned long adts::getPayloadSize(){
if (!data || !len){
return 0;
}
return (((data[3] & 0x03) << 11) | (data[4] << 3) | ((data[5] >> 5) & 0x07)) - getHeaderSize();
}
unsigned long adts::getSampleCount(){
if (!data || !len){
return 0;
}
return ((data[6] & 0x03) + 1) * 1024;//Number of samples in this frame * 1024
}
char * adts::getPayload() {
if (!data || !len){
return 0;
}
return data + getHeaderSize();
}
std::string adts::toPrettyString(){
std::stringstream res;
res << "SyncWord: " << std::hex << (((int)data[0] << 4) | ((data[1] >> 4) & 0x0F)) << std::endl;
res << "HeaderSize: " << std::dec << getHeaderSize() << std::endl;
res << "PayloadSize: " << std::dec << getPayloadSize() << std::endl;
return res.str();
}
}

25
lib/adts.h Normal file
View file

@ -0,0 +1,25 @@
#include <string>
namespace aac {
class adts {
public:
adts();
adts(char * _data, unsigned long _len);
adts(const adts & rhs);
~adts();
adts& operator = (const adts & rhs);
unsigned long getAACProfile();
unsigned long getFrequencyIndex();
unsigned long getFrequency();
unsigned long getChannelConfig();
unsigned long getChannelCount();
unsigned long getHeaderSize();
unsigned long getPayloadSize();
unsigned long getSampleCount();
char * getPayload();
std::string toPrettyString();
private:
char * data;
unsigned long len;
};
}

View file

@ -26,7 +26,7 @@ namespace Utils {
} }
} }
void bitstream::append(char * input, size_t bytes) { void bitstream::append(const char * input, size_t bytes) {
if (checkBufferSize(dataSize + bytes)) { if (checkBufferSize(dataSize + bytes)) {
memcpy(data + dataSize, input, bytes); memcpy(data + dataSize, input, bytes);
dataSize += bytes; dataSize += bytes;

View file

@ -12,7 +12,7 @@ namespace Utils {
append(std::string(input, 1)); append(std::string(input, 1));
return *this; return *this;
}; };
void append(char * input, size_t bytes); void append(const char * input, size_t bytes);
void append(std::string input); void append(std::string input);
long long unsigned int size(); long long unsigned int size();
void skip(size_t count); void skip(size_t count);

View file

@ -108,7 +108,7 @@ namespace DTSC {
operator bool() const; operator bool() const;
packType getVersion() const; packType getVersion() const;
void reInit(const char * data_, unsigned int len, bool noCopy = false); void reInit(const char * data_, unsigned int len, bool noCopy = false);
void genericFill(long long packTime, long long packOffset, long long packTrack, char * packData, long long packDataSize, long long packBytePos, bool isKeyframe); void genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, long long packBytePos, bool isKeyframe);
void getString(const char * identifier, char *& result, unsigned int & len) const; void getString(const char * identifier, char *& result, unsigned int & len) const;
void getString(const char * identifier, std::string & result) const; void getString(const char * identifier, std::string & result) const;
void getInt(const char * identifier, int & result) const; void getInt(const char * identifier, int & result) const;

View file

@ -19,7 +19,10 @@ namespace DTSC {
/// Copy constructor for packets, copies an existing packet with same noCopy flag as original. /// Copy constructor for packets, copies an existing packet with same noCopy flag as original.
Packet::Packet(const Packet & rhs) { Packet::Packet(const Packet & rhs) {
Packet(rhs.data, rhs.dataLen, !rhs.master); master = false;
bufferLen = 0;
data = NULL;
reInit(rhs.data, rhs.dataLen, !rhs.master);
} }
/// Data constructor for packets, either references or copies a packet from raw data. /// Data constructor for packets, either references or copies a packet from raw data.
@ -112,7 +115,7 @@ namespace DTSC {
///\param noCopy Determines whether to make a copy or not ///\param noCopy Determines whether to make a copy or not
void Packet::reInit(const char * data_, unsigned int len, bool noCopy) { void Packet::reInit(const char * data_, unsigned int len, bool noCopy) {
if (!data_) { if (!data_) {
DEBUG_MSG(DLVL_DEVEL, "ReInit received a null pointer with len %d, ignoring", len); HIGH_MSG("ReInit received a null pointer with len %d, ignoring", len);
null(); null();
return; return;
} }
@ -168,7 +171,8 @@ namespace DTSC {
} }
/// Re-initializes this Packet to contain a generic DTSC packet with the given data fields. /// Re-initializes this Packet to contain a generic DTSC packet with the given data fields.
void Packet::genericFill(long long packTime, long long packOffset, long long packTrack, char * packData, long long packDataSize, long long packBytePos, bool isKeyframe){ /// When given a NULL pointer, the data is reserved and memset to 0
void Packet::genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, long long packBytePos, bool isKeyframe){
null(); null();
master = true; master = true;
//time and trackID are part of the 20-byte header. //time and trackID are part of the 20-byte header.
@ -217,7 +221,11 @@ namespace DTSC {
memcpy(data+offset, "\000\004data\002", 7); memcpy(data+offset, "\000\004data\002", 7);
tmpLong = htonl(packDataSize); tmpLong = htonl(packDataSize);
memcpy(data+offset+7, (char *)&tmpLong, 4); memcpy(data+offset+7, (char *)&tmpLong, 4);
memcpy(data+offset+11, packData, packDataSize); if (packData){
memcpy(data+offset+11, packData, packDataSize);
}else{
memset(data+offset+11, 0, packDataSize);
}
//finish container with 0x0000EE //finish container with 0x0000EE
memcpy(data+offset+11+packDataSize, "\000\000\356", 3); memcpy(data+offset+11+packDataSize, "\000\000\356", 3);
} }

71
lib/h264.cpp Normal file
View file

@ -0,0 +1,71 @@
#include "h264.h"
#include <cstdlib>
#include <cstring>
#include "bitfields.h"
#include "defines.h"
namespace h264 {
unsigned long toAnnexB(const char * data, unsigned long dataSize, char *& result){
//toAnnexB keeps the same size.
if (!result){
result = (char *)malloc(dataSize);
}
int offset = 0;
while (offset < dataSize){
//Read unit size
unsigned long unitSize = Bit::btohl(data + offset);
//Write annex b header
memset(result + offset, 0x00, 3);
result[offset + 3] = 0x01;
//Copy the nal unit
memcpy(result + offset + 4, data + offset + 4, unitSize);
//Update the offset
offset += 4 + unitSize;
}
return dataSize;
}
unsigned long fromAnnexB(const char * data, unsigned long dataSize, char *& result){
if (!result){
//first compute the new size. This might be the same as the annex b version, but this is not guaranteed
int offset = 0;
int newSize = 0;
while (offset < dataSize){
const char * begin = (const char*)memmem(data + offset, dataSize - offset, "\000\000\001", 3);
begin += 3;//Initialize begin after the first 0x000001 pattern.
const char * end = (const char*)memmem(begin, dataSize - (begin - data), "\000\000\001", 3);
if (end - data > dataSize){
end = data + dataSize;
}
//Check for 4-byte lead in's. Yes, we access -1 here
if (end[-1] == 0x00){
end--;
}
newSize += 4 + (end - begin);//end - begin = nalSize
offset = end - data;
}
result = (char *)malloc(newSize);
}
int offset = 0;
int newOffset = 0;
while (offset < dataSize){
const char * begin = ((const char*)memmem(data + offset, dataSize - offset, "\000\000\001", 3)) + 3;//Initialize begin after the first 0x000001 pattern.
const char * end = (const char*)memmem(begin, dataSize - (begin - data), "\000\000\001", 3);
if (end - data > dataSize){
end = data + dataSize;
}
//Check for 4-byte lead in's. Yes, we access -1 here
if (end[-1] == 0x00){
end--;
}
unsigned int nalSize = end - begin;
Bit::htobl(result + newOffset, nalSize);
memcpy(result + newOffset + 4, begin, nalSize);
newOffset += 4 + nalSize;
offset = end - data;
}
return newOffset;
}
}

4
lib/h264.h Normal file
View file

@ -0,0 +1,4 @@
namespace h264 {
unsigned long toAnnexB(const char * data, unsigned long dataSize, char *& result);
unsigned long fromAnnexB(const char * data, unsigned long dataSize, char *& result);
}

View file

@ -21,6 +21,20 @@ namespace h264 {
return result; return result;
} }
std::deque<nalData> analyseH264Packet(const char * data, unsigned long len){
std::deque<nalData> res;
int offset = 0;
while (offset < len){
nalData entry;
entry.nalSize = Bit::btohl(data + offset);
entry.nalType = (data + offset)[4] & 0x1F;
res.push_back(entry);
offset += entry.nalSize + 4;
}
return res;
}
///empty constructor of NAL ///empty constructor of NAL
NAL::NAL() { NAL::NAL() {
@ -518,5 +532,111 @@ namespace h264 {
std::cout << "second_chroma_qp_index_offset: " << bs.getExpGolomb() << std::endl; std::cout << "second_chroma_qp_index_offset: " << bs.getExpGolomb() << std::endl;
} }
sequenceParameterSet::sequenceParameterSet(const char * _data, unsigned long _dataLen) : data(_data), dataLen(_dataLen) {}
SPSMeta sequenceParameterSet::getCharacteristics() const {
SPSMeta result;
//For calculating width
unsigned int widthInMbs = 0;
unsigned int cropHorizontal = 0;
//For calculating height
bool mbsOnlyFlag = 0;
unsigned int heightInMapUnits = 0;
unsigned int cropVertical = 0;
//Fill the bitstream
Utils::bitstream bs;
for (unsigned int i = 1; i < dataLen; i++) {
if (i + 2 < dataLen && (memcmp(data + i, "\000\000\003", 3) == 0)){//Emulation prevention bytes
//Yes, we increase i here
bs.append(data + i, 2);
i += 2;
} else {
//No we don't increase i here
bs.append(data + i, 1);
}
}
char profileIdc = bs.get(8);
//Start skipping unused data
bs.skip(16);
bs.getUExpGolomb();
if (profileIdc == 100 || profileIdc == 110 || profileIdc == 122 || profileIdc == 244 || profileIdc == 44 || profileIdc == 83 || profileIdc == 86 || profileIdc == 118 || profileIdc == 128) {
//chroma format idc
if (bs.getUExpGolomb() == 3) {
bs.skip(1);
}
bs.getUExpGolomb();
bs.getUExpGolomb();
bs.skip(1);
if (bs.get(1)) {
DEBUG_MSG(DLVL_DEVEL, "Scaling matrix not implemented yet");
}
}
bs.getUExpGolomb();
unsigned int pic_order_cnt_type = bs.getUExpGolomb();
if (!pic_order_cnt_type) {
bs.getUExpGolomb();
} else if (pic_order_cnt_type == 1) {
DEBUG_MSG(DLVL_DEVEL, "This part of the implementation is incomplete(2), to be continued. If this message is shown, contact developers immediately.");
}
bs.getUExpGolomb();
bs.skip(1);
//Stop skipping data and start doing usefull stuff
widthInMbs = bs.getUExpGolomb() + 1;
heightInMapUnits = bs.getUExpGolomb() + 1;
mbsOnlyFlag = bs.get(1);//Gets used in height calculation
if (!mbsOnlyFlag) {
bs.skip(1);
}
bs.skip(1);
//cropping flag
if (bs.get(1)) {
cropHorizontal = bs.getUExpGolomb();//leftOffset
cropHorizontal += bs.getUExpGolomb();//rightOffset
cropVertical = bs.getUExpGolomb();//topOffset
cropVertical += bs.getUExpGolomb();//bottomOffset
}
//vuiParameters
if (bs.get(1)) {
//Skipping all the paramters we dont use
if (bs.get(1)) {
if (bs.get(8) == 255) {
bs.skip(32);
}
}
if (bs.get(1)) {
bs.skip(1);
}
if (bs.get(1)) {
bs.skip(4);
if (bs.get(1)) {
bs.skip(24);
}
}
if (bs.get(1)) {
bs.getUExpGolomb();
bs.getUExpGolomb();
}
//Decode timing info
if (bs.get(1)) {
unsigned int unitsInTick = bs.get(32);
unsigned int timeScale = bs.get(32);
result.fps = (double)timeScale / (2 * unitsInTick);
bs.skip(1);
}
}
result.width = (widthInMbs * 16) - (cropHorizontal * 2);
result.height = ((mbsOnlyFlag ? 1 : 2) * heightInMapUnits * 16) - (cropVertical * 2);
return result;
}
} }

View file

@ -1,9 +1,17 @@
#pragma once
#include <deque>
#include <string> #include <string>
#include <cstdio> #include <cstdio>
#include <deque> #include <deque>
#include "dtsc.h" #include "dtsc.h"
namespace h264 { namespace h264 {
struct nalData {
unsigned char nalType;
unsigned long nalSize;
};
std::deque<nalData> analyseH264Packet(const char * data, unsigned long len);
std::deque<int> parseNalSizes(DTSC::Packet & pack); std::deque<int> parseNalSizes(DTSC::Packet & pack);
///Struct containing pre-calculated metadata of an SPS nal unit. Width and height in pixels, fps in Hz ///Struct containing pre-calculated metadata of an SPS nal unit. Width and height in pixels, fps in Hz
@ -45,4 +53,14 @@ namespace h264 {
PPS(std::string & InputData): NAL(InputData) {}; PPS(std::string & InputData): NAL(InputData) {};
void analyzePPS(); void analyzePPS();
}; };
class sequenceParameterSet {
public:
sequenceParameterSet(const char * _data, unsigned long _dataLen);
SPSMeta getCharacteristics() const;
private:
const char * data;
unsigned long dataLen;
};
}//ns h264 }//ns h264

View file

@ -201,6 +201,31 @@ std::string Util::Procs::getOutputOf(char * const * argv) {
return ret; return ret;
} }
///This function prepares a deque for getOutputOf and automatically inserts a NULL at the end of the char* const*
char* const* Util::Procs::dequeToArgv(std::deque<std::string> & argDeq){
char** ret = (char**)malloc((argDeq.size()+1)*sizeof(char*));
for (int i = 0; i<argDeq.size(); i++){
ret[i] = (char*)argDeq[i].c_str();
}
ret[argDeq.size()] = NULL;
return ret;
}
std::string Util::Procs::getOutputOf(std::deque<std::string> & argDeq){
std::string ret;
char* const* argv = dequeToArgv(argDeq);//Note: Do not edit deque before executing command
ret = getOutputOf(argv);
return ret;
}
pid_t Util::Procs::StartPiped(std::deque<std::string> & argDeq, int * fdin, int * fdout, int * fderr) {
pid_t ret;
char* const* argv = dequeToArgv(argDeq);//Note: Do not edit deque before executing command
ret = Util::Procs::StartPiped(argv, fdin, fdout, fderr);
return ret;
}
/// Starts a new process with given fds if the name is not already active. /// Starts a new process with given fds if the name is not already active.
/// \return 0 if process was not started, process PID otherwise. /// \return 0 if process was not started, process PID otherwise.
/// \arg argv Command for this process. /// \arg argv Command for this process.

View file

@ -6,6 +6,7 @@
#include <string> #include <string>
#include <set> #include <set>
#include <vector> #include <vector>
#include <deque>
/// Contains utility code, not directly related to streaming media /// Contains utility code, not directly related to streaming media
namespace Util { namespace Util {
@ -19,9 +20,12 @@ namespace Util {
static void exit_handler(); static void exit_handler();
static void runCmd(std::string & cmd); static void runCmd(std::string & cmd);
static void setHandler(); static void setHandler();
static char* const* dequeToArgv(std::deque<std::string> & argDeq);
public: public:
static std::string getOutputOf(char * const * argv); static std::string getOutputOf(char * const * argv);
static std::string getOutputOf(std::deque<std::string> & argDeq);
static pid_t StartPiped(char * const * argv, int * fdin, int * fdout, int * fderr); static pid_t StartPiped(char * const * argv, int * fdin, int * fdout, int * fderr);
static pid_t StartPiped(std::deque<std::string> & argDeq, int * fdin, int * fdout, int * fderr);
static void Stop(pid_t name); static void Stop(pid_t name);
static void Murder(pid_t name); static void Murder(pid_t name);
static void StopAll(); static void StopAll();

View file

@ -12,6 +12,7 @@
#include "shared_memory.h" #include "shared_memory.h"
#include "stream.h" #include "stream.h"
#include "procs.h" #include "procs.h"
#include "bitfields.h"
namespace IPC { namespace IPC {
@ -1025,5 +1026,43 @@ namespace IPC {
} }
return (myPage.mapped + offsetOnPage + (hasCounter ? 1 : 0)); return (myPage.mapped + offsetOnPage + (hasCounter ? 1 : 0));
} }
userConnection::userConnection(char * _data) {
data = _data;
}
unsigned long userConnection::getTrackId(size_t offset) const {
if (offset >= SIMUL_TRACKS){
WARN_MSG("Trying to get track id for entry %lu, while there are only %d entries allowed", offset, SIMUL_TRACKS);
return 0;
}
return Bit::btohl(data + (offset * 6));
}
void userConnection::setTrackId(size_t offset, unsigned long trackId) const {
if (offset >= SIMUL_TRACKS){
WARN_MSG("Trying to set track id for entry %lu, while there are only %d entries allowed", offset, SIMUL_TRACKS);
return;
}
Bit::htobl(data + (offset * 6), trackId);
}
unsigned long userConnection::getKeynum(size_t offset) const {
if (offset >= SIMUL_TRACKS){
WARN_MSG("Trying to get keynum for entry %lu, while there are only %d entries allowed", offset, SIMUL_TRACKS);
return 0;
}
return Bit::btohs(data + (offset * 6) + 4);
}
void userConnection::setKeynum(size_t offset, unsigned long keynum) {
if (offset >= SIMUL_TRACKS){
WARN_MSG("Trying to set keynum for entry %lu, while there are only %d entries allowed", offset, SIMUL_TRACKS);
return;
}
Bit::htobs(data + (offset * 6) + 4, keynum);
}
} }

View file

@ -228,4 +228,15 @@ namespace IPC {
///\brief Whether the payload has a counter, if so, it is added in front of the payload ///\brief Whether the payload has a counter, if so, it is added in front of the payload
bool hasCounter; bool hasCounter;
}; };
class userConnection {
public:
userConnection(char * _data);
unsigned long getTrackId(size_t offset) const;
void setTrackId(size_t offset, unsigned long trackId) const;
unsigned long getKeynum(size_t offset) const;
void setKeynum(size_t offset, unsigned long keynum);
private:
char * data;
};
} }

View file

@ -25,6 +25,11 @@ namespace TS {
clear(); clear();
} }
Packet::Packet(const Packet & rhs){
memcpy(strBuf, rhs.strBuf, 188);
pos = 188;
}
/// This function fills a Packet from a file. /// This function fills a Packet from a file.
/// It fills the content with the next 188 bytes int he file. /// It fills the content with the next 188 bytes int he file.
/// \param Data The data to be read into the packet. /// \param Data The data to be read into the packet.
@ -34,11 +39,11 @@ namespace TS {
if (!fread((void *)strBuf, 188, 1, data)) { if (!fread((void *)strBuf, 188, 1, data)) {
return false; return false;
} }
pos=188;
if (strBuf[0] != 0x47){ if (strBuf[0] != 0x47){
INFO_MSG("Failed to read a good packet on pos %lld", pos); INFO_MSG("Failed to read a good packet on pos %lld", pos);
return false; return false;
} }
pos=188;
return true; return true;
} }
@ -416,7 +421,7 @@ namespace TS {
/// \return A character pointer to the internal packet buffer data /// \return A character pointer to the internal packet buffer data
const char * Packet::checkAndGetBuffer() const{ const char * Packet::checkAndGetBuffer() const{
if (pos != 188) { if (pos != 188) {
DEBUG_MSG(DLVL_ERROR, "Size invalid (%d) - invalid data from this point on", pos); DEBUG_MSG(DLVL_HIGH, "Size invalid (%d) - invalid data from this point on", pos);
} }
return strBuf; return strBuf;
} }
@ -568,6 +573,11 @@ namespace TS {
} }
ProgramAssociationTable & ProgramAssociationTable::operator = (const Packet & rhs){
memcpy(strBuf, rhs.checkAndGetBuffer(), 188);
pos = 188;
return *this;
}
///Retrieves the current addStuffingoffset value for a PAT ///Retrieves the current addStuffingoffset value for a PAT
char ProgramAssociationTable::getOffset() const{ char ProgramAssociationTable::getOffset() const{
unsigned int loc = 4 + (getAdaptationField() > 1 ? getAdaptationFieldLen() + 1 : 0); unsigned int loc = 4 + (getAdaptationField() > 1 ? getAdaptationFieldLen() + 1 : 0);
@ -753,6 +763,12 @@ namespace TS {
pos=4; pos=4;
} }
ProgramMappingTable & ProgramMappingTable::operator = (const Packet & rhs) {
memcpy(strBuf, rhs.checkAndGetBuffer(), 188);
pos = 188;
return *this;
}
char ProgramMappingTable::getOffset() const{ char ProgramMappingTable::getOffset() const{
unsigned int loc = 4 + (getAdaptationField() > 1 ? getAdaptationFieldLen() + 1 : 0); unsigned int loc = 4 + (getAdaptationField() > 1 ? getAdaptationFieldLen() + 1 : 0);
return strBuf[loc]; return strBuf[loc];

View file

@ -22,6 +22,7 @@ namespace TS {
public: public:
//Constructors and fillers //Constructors and fillers
Packet(); Packet();
Packet(const Packet & rhs);
~Packet(); ~Packet();
bool FromPointer(const char * data); bool FromPointer(const char * data);
bool FromFile(FILE * data); bool FromFile(FILE * data);
@ -83,6 +84,7 @@ namespace TS {
class ProgramAssociationTable : public Packet { class ProgramAssociationTable : public Packet {
public: public:
ProgramAssociationTable & operator = (const Packet & rhs);
char getOffset() const; char getOffset() const;
char getTableId() const; char getTableId() const;
short getSectionLength() const; short getSectionLength() const;
@ -119,6 +121,7 @@ namespace TS {
class ProgramMappingTable : public Packet { class ProgramMappingTable : public Packet {
public: public:
ProgramMappingTable(); ProgramMappingTable();
ProgramMappingTable & operator = (const Packet & rhs);
char getOffset() const; char getOffset() const;
void setOffset(char newVal); void setOffset(char newVal);
char getTableId() const; char getTableId() const;

324
lib/ts_stream.cpp Normal file
View file

@ -0,0 +1,324 @@
#include "ts_stream.h"
#include "defines.h"
#include "h264.h"
#include "nal.h"
#include "mp4_generic.h"
namespace TS {
void Stream::parse(char * newPack, unsigned long long bytePos) {
Packet newPacket;
newPacket.FromPointer(newPack);
parse(newPacket, bytePos);
}
void Stream::clear(){
pesStreams.clear();
pesPositions.clear();
payloadSize.clear();
outPackets.clear();
}
void Stream::parse(Packet & newPack, unsigned long long bytePos) {
int tid = newPack.getPID();
if (tid == 0){
associationTable = newPack;
return;
}
//If we are here, the packet is not a PAT.
//First check if it is listed in the PAT as a PMT track.
int pmtCount = associationTable.getProgramCount();
for (int i = 0; i < pmtCount; i++){
if (tid == associationTable.getProgramPID(i)){
mappingTable[tid] = newPack;
ProgramMappingEntry entry = mappingTable[tid].getEntry(0);
while (entry){
unsigned long pid = entry.getElementaryPid();
pidToCodec[pid] = entry.getStreamType();
entry.advance();
}
return;
}
}
//If it is not a PMT, check the list of all PMTs to see if this is a new PES track.
bool inPMT = false;
for (std::map<unsigned long, ProgramMappingTable>::iterator it = mappingTable.begin(); it!= mappingTable.end(); it++){
ProgramMappingEntry entry = it->second.getEntry(0);
while (entry){
if (tid == entry.getElementaryPid()){
inPMT = true;
break;
}
entry.advance();
}
if (inPMT){
break;
}
}
if (!inPMT){
HIGH_MSG("Encountered a packet on track %d, but the track is not registered in any PMT", tid);
return;
}
pesStreams[tid].push_back(newPack);
pesPositions[tid].push_back(bytePos);
if (!newPack.getUnitStart() || pesStreams[tid].size() == 1){
payloadSize[tid] += newPack.getPayloadLength();
}
parsePES(tid);
}
bool Stream::hasPacketOnEachTrack() const {
if (!pidToCodec.size()){
return false;
}
for (std::map<unsigned long, unsigned long>::const_iterator it = pidToCodec.begin(); it != pidToCodec.end(); it++){
if (!outPackets.count(it->first) || !outPackets.at(it->first).size()){
return false;
}
}
return true;
}
bool Stream::hasPacket(unsigned long tid) const {
if (!pesStreams.count(tid)){
return false;
}
if (outPackets.count(tid) && outPackets.at(tid).size()){
return true;
}
for (int i = 1; i < pesStreams.find(tid)->second.size(); i++) {
if (pesStreams.find(tid)->second.at(i).getUnitStart()) {
return true;
}
}
return false;
}
unsigned long long decodePTS(const char * data){
unsigned long long time;
time = ((data[0] >> 1) & 0x07);
time <<= 15;
time |= ((int)data[1] << 7) | ((data[2] >> 1) & 0x7F);
time <<= 15;
time |= ((int)data[3] << 7) | ((data[4] >> 1) & 0x7F);
time /= 90;
return time;
}
void Stream::parsePES(unsigned long tid){
std::deque<Packet> & inStream = pesStreams[tid];
if (inStream.size() == 1){
return;
}
if (!inStream.back().getUnitStart()){
return;
}
unsigned long long bPos = pesPositions[tid].front();
//Create a buffer for the current PES, and remove it from the pesStreams buffer.
int paySize = payloadSize[tid];
char * payload = (char*)malloc(paySize);
int offset = 0;
while (inStream.size() != 1){
memcpy(payload + offset, inStream.front().getPayload(), inStream.front().getPayloadLength());
offset += inStream.front().getPayloadLength();
inStream.pop_front();
pesPositions[tid].pop_front();
}
//Parse the PES header
offset = 0;
while(offset < paySize){
const char * pesHeader = payload + offset;
//Check for large enough buffer
if ((paySize - offset) < 9 || (paySize - offset) < 9 + pesHeader[8]){
INFO_MSG("Not enough data on track %lu, discarding remainder of data", tid);
break;
}
//Check for valid PES lead-in
if(pesHeader[0] != 0 || pesHeader[1] != 0x00 || pesHeader[2] != 0x01){
INFO_MSG("Invalid PES Lead in on track %lu, discarding it", tid);
break;
}
//Read the payload size.
//Note: if the payload size is 0, then we assume the pes packet will cover the entire TS Unit.
//Note: this is technically only allowed for video pes streams.
unsigned long long realPayloadSize = (((int)pesHeader[4] << 8) | pesHeader[5]);
if (!realPayloadSize){
realPayloadSize = paySize;
}
if (pidToCodec[tid] == AAC){
realPayloadSize -= (3 + pesHeader[8]);
}else{
realPayloadSize -= (9 + pesHeader[8]);
}
//Read the metadata for this PES Packet
///\todo Determine keyframe-ness
unsigned int timeStamp = 0;
unsigned int timeOffset = 0;
unsigned int pesOffset = 9;
if ((pesHeader[7] >> 6) & 0x02){//Check for PTS presence
timeStamp = decodePTS(pesHeader + pesOffset);
pesOffset += 5;
if (((pesHeader[7] & 0xC0) >> 6) & 0x01){//Check for DTS presence (yes, only if PTS present)
timeOffset = timeStamp;
timeStamp = decodePTS(pesHeader + pesOffset);
pesOffset += 5;
timeOffset -= timeStamp;
}
}
if (paySize - offset - pesOffset < realPayloadSize){
INFO_MSG("Not enough data left on track %lu.", tid);
break;
}
char * pesPayload = payload + offset + pesOffset;
//Create a new (empty) DTSC Packet at the end of the buffer
if (pidToCodec[tid] == AAC){
//Parse all the ADTS packets
unsigned long offsetInPes = 0;
unsigned long samplesRead = 0;
while (offsetInPes < realPayloadSize){
outPackets[tid].push_back(DTSC::Packet());
aac::adts adtsPack(pesPayload + offsetInPes, realPayloadSize - offsetInPes);
if (!adtsInfo.count(tid)){
adtsInfo[tid] = adtsPack;
}
outPackets[tid].back().genericFill(timeStamp + ((samplesRead * 1000) / adtsPack.getFrequency()), timeOffset, tid, adtsPack.getPayload(), adtsPack.getPayloadSize(), bPos, 0);
samplesRead += adtsPack.getSampleCount();
offsetInPes += adtsPack.getHeaderSize() + adtsPack.getPayloadSize();
}
}
if (pidToCodec[tid] == H264){
//Convert from annex b
char * parsedData = NULL;
bool isKeyFrame = false;
unsigned long parsedSize = h264::fromAnnexB(pesPayload, realPayloadSize, parsedData);
std::deque<h264::nalData> nalInfo = h264::analyseH264Packet(parsedData, parsedSize);
int dataOffset = 0;
for (std::deque<h264::nalData>::iterator it = nalInfo.begin(); it != nalInfo.end(); it++){
switch (it->nalType){
case 0x05: {
isKeyFrame = true;
break;
}
case 0x07: {
spsInfo[tid] = std::string(parsedData + dataOffset + 4, it->nalSize);
break;
}
case 0x08: {
ppsInfo[tid] = std::string(parsedData + dataOffset + 4, it->nalSize);
break;
}
default: break;
}
dataOffset += 4 + it->nalSize;
}
outPackets[tid].push_back(DTSC::Packet());
outPackets[tid].back().genericFill(timeStamp, timeOffset, tid, parsedData, parsedSize, bPos, isKeyFrame);
free(parsedData);
}
//We are done with the realpayload size, reverse calculation so we know the correct offset increase.
if (pidToCodec[tid] == AAC){
realPayloadSize += (3 + pesHeader[8]);
}else{
realPayloadSize += (9 + pesHeader[8]);
}
offset += realPayloadSize;
}
free(payload);
payloadSize[tid] = inStream.front().getPayloadLength();
}
void Stream::getPacket(unsigned long tid, DTSC::Packet & pack) {
pack.null();
if (!hasPacket(tid)){
ERROR_MSG("Trying to obtain a packet on track %lu, but no full packet is available", tid);
return;
}
//Handle the situation where we have DTSC Packets buffered
if (outPackets[tid].size()){
pack = outPackets[tid].front();
outPackets[tid].pop_front();
if (!outPackets[tid].size()){
payloadSize[tid] = 0;
for (std::deque<Packet>::iterator it = pesStreams[tid].begin(); it != pesStreams[tid].end(); it++){
//Break this loop on the second TS Packet with the UnitStart flag set, not on the first.
if (it->getUnitStart() && it != pesStreams[tid].begin()){
break;
}
payloadSize[tid] += it->getPayloadLength();
}
}
return;
}
}
void Stream::getEarliestPacket(DTSC::Packet & pack){
pack.null();
if (!hasPacketOnEachTrack()){
return;
}
unsigned long packTime = 0xFFFFFFFFull;
unsigned long packTrack = 0;
for (std::map<unsigned long, std::deque<DTSC::Packet> >::iterator it = outPackets.begin(); it != outPackets.end(); it++){
if (it->second.front().getTime() < packTime){
packTrack = it->first;
packTime = it->second.front().getTime();
}
}
pack = outPackets[packTrack].front();
outPackets[packTrack].pop_front();
}
void Stream::initializeMetadata(DTSC::Meta & meta) {
for (std::map<unsigned long, unsigned long>::const_iterator it = pidToCodec.begin(); it != pidToCodec.end(); it++){
if (!meta.tracks.count(it->first) && it->second == H264){
if (!spsInfo.count(it->first) || !ppsInfo.count(it->first)){
continue;
}
meta.tracks[it->first].type = "video";
meta.tracks[it->first].codec = "H264";
meta.tracks[it->first].trackID = it->first;
std::string tmpBuffer = spsInfo[it->first];
h264::sequenceParameterSet sps(spsInfo[it->first].data(), spsInfo[it->first].size());
h264::SPSMeta spsChar = sps.getCharacteristics();
meta.tracks[it->first].width = spsChar.width;
meta.tracks[it->first].height = spsChar.height;
meta.tracks[it->first].fpks = spsChar.fps * 1000;
MP4::AVCC avccBox;
avccBox.setVersion(1);
avccBox.setProfile(spsInfo[it->first][1]);
avccBox.setCompatibleProfiles(spsInfo[it->first][2]);
avccBox.setLevel(spsInfo[it->first][3]);
avccBox.setSPSNumber(1);
avccBox.setSPS(spsInfo[it->first]);
avccBox.setPPSNumber(1);
avccBox.setPPS(ppsInfo[it->first]);
meta.tracks[it->first].init = std::string(avccBox.payload(), avccBox.payloadSize());
INFO_MSG("Initialized metadata for track %lu, with an SPS of %lu bytes, and a PPS of %lu bytes", it->first, spsInfo[it->first].size(), ppsInfo[it->first].size());
}
if (!meta.tracks.count(it->first) && it->second == AAC){
meta.tracks[it->first].type = "audio";
meta.tracks[it->first].codec = "AAC";
meta.tracks[it->first].trackID = it->first;
meta.tracks[it->first].size = 16;
meta.tracks[it->first].rate = adtsInfo[it->first].getFrequency();
meta.tracks[it->first].channels = adtsInfo[it->first].getChannelCount();
char audioInit[2];//5 bits object type, 4 bits frequency index, 4 bits channel index
audioInit[0] = ((adtsInfo[it->first].getAACProfile() & 0x1F) << 3) | ((adtsInfo[it->first].getFrequencyIndex() & 0x0E) >> 1);
audioInit[1] = ((adtsInfo[it->first].getFrequencyIndex() & 0x01) << 7) | ((adtsInfo[it->first].getChannelConfig() & 0x0F) << 3);
meta.tracks[it->first].init = std::string(audioInit, 2);
}
}
}
}

37
lib/ts_stream.h Normal file
View file

@ -0,0 +1,37 @@
#include "ts_packet.h"
#include "adts.h"
#include <map>
#include <deque>
namespace TS {
enum codecType {
H264 = 0x1B,
AAC = 0x0F,
AC3 = 0x81
};
class Stream{
public:
void parse(Packet & newPack, unsigned long long bytePos);
void parse(char * newPack, unsigned long long bytePos);
bool hasPacketOnEachTrack() const;
bool hasPacket(unsigned long tid) const;
void getPacket(unsigned long tid, DTSC::Packet & pack);
void getEarliestPacket(DTSC::Packet & pack);
void initializeMetadata(DTSC::Meta & meta);
void clear();
private:
ProgramAssociationTable associationTable;
std::map<unsigned long, ProgramMappingTable> mappingTable;
std::map<unsigned long, std::deque<Packet> > pesStreams;
std::map<unsigned long, std::deque<unsigned long long> > pesPositions;
std::map<unsigned long, unsigned long> payloadSize;
std::map<unsigned long, std::deque<DTSC::Packet> > outPackets;
std::map<unsigned long, unsigned long> pidToCodec;
std::map<unsigned long, aac::adts > adtsInfo;
std::map<unsigned long, std::string > spsInfo;
std::map<unsigned long, std::string > ppsInfo;
void parsePES(unsigned long tid);
};
}

View file

@ -0,0 +1,54 @@
#include <fcntl.h>
#include <iostream>
#include <map>
#include <iomanip>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <string.h>
#include <fstream>
#include <unistd.h>
#include <sstream>
#include <signal.h>
#include <mist/ts_packet.h>
#include <mist/ts_stream.h>
#include <mist/config.h>
namespace Analysers {
/// Debugging tool for TS data.
/// Expects TS data through stdin, outputs human-readable information to stderr.
/// \return The return code of the analyser.
int analyseTS(bool validate, bool analyse, int detailLevel){
TS::Stream tsStream;
std::map<unsigned long long, std::string> payloads;
TS::Packet packet;
long long int upTime = Util::bootSecs();
int64_t pcr = 0;
unsigned int bytes = 0;
char packetPtr[188];
while (std::cin.good()){
std::cin.read(packetPtr,188);
if(std::cin.gcount() != 188){break;}
bytes += 188;
if(packet.FromPointer(packetPtr)){
tsStream.parse(packet, bytes);
if (tsStream.hasPacketOnEachTrack()){
DTSC::Packet dtscPack;
tsStream.getEarliestPacket(dtscPack);
std::cout << dtscPack.toJSON().toPrettyString();
}
}
}
return 0;
}
}
int main(int argc, char ** argv){
Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION);
conf.addOption("analyse", JSON::fromString("{\"long\":\"analyse\", \"short\":\"a\", \"default\":1, \"long_off\":\"notanalyse\", \"short_off\":\"b\", \"help\":\"Analyse a file's contents (-a), or don't (-b) returning false on error. Default is analyse.\"}"));
conf.addOption("validate", JSON::fromString("{\"long\":\"validate\", \"short\":\"V\", \"default\":0, \"long_off\":\"notvalidate\", \"short_off\":\"X\", \"help\":\"Validate (-V) the file contents or don't validate (-X) its integrity, returning false on error. Default is don't validate.\"}"));
conf.addOption("detail", JSON::fromString("{\"long\":\"detail\", \"short\":\"D\", \"arg\":\"num\", \"default\":3, \"help\":\"Detail level of analysis.\"}"));
conf.parseArgs(argc, argv);
return Analysers::analyseTS(conf.getBool("validate"),conf.getBool("analyse"),conf.getInteger("detail"));
}

View file

@ -244,16 +244,16 @@ int Controller::handleAPIConnection(Socket::Connection & conn){
//if object, delete all entries //if object, delete all entries
//if string, delete just the one //if string, delete just the one
if (Request["deletestream"].isString()){ if (Request["deletestream"].isString()){
Controller::Storage["streams"].removeMember(Request["deletestream"].asStringRef()); Controller::deleteStream(Request["deletestream"].asStringRef(), Controller::Storage["streams"]);
} }
if (Request["deletestream"].isArray()){ if (Request["deletestream"].isArray()){
for (JSON::ArrIter it = Request["deletestream"].ArrBegin(); it != Request["deletestream"].ArrEnd(); ++it){ for (JSON::ArrIter it = Request["deletestream"].ArrBegin(); it != Request["deletestream"].ArrEnd(); ++it){
Controller::Storage["streams"].removeMember(it->asString()); Controller::deleteStream(it->asStringRef(), Controller::Storage["streams"]);
} }
} }
if (Request["deletestream"].isObject()){ if (Request["deletestream"].isObject()){
for (JSON::ObjIter it = Request["deletestream"].ObjBegin(); it != Request["deletestream"].ObjEnd(); ++it){ for (JSON::ObjIter it = Request["deletestream"].ObjBegin(); it != Request["deletestream"].ObjEnd(); ++it){
Controller::Storage["streams"].removeMember(it->first); Controller::deleteStream(it->first, Controller::Storage["streams"]);
} }
} }
} }

View file

@ -15,6 +15,7 @@
///\brief Holds everything unique to the controller. ///\brief Holds everything unique to the controller.
namespace Controller { namespace Controller {
std::map<std::string, pid_t> inputProcesses;
///\brief Checks whether two streams are equal. ///\brief Checks whether two streams are equal.
///\param one The first stream for the comparison. ///\param one The first stream for the comparison.
@ -58,6 +59,31 @@ namespace Controller {
} }
if (URL.substr(0, 1) != "/"){ if (URL.substr(0, 1) != "/"){
//push-style stream //push-style stream
if (data["udpport"].asInt()){
std::string udpPort = data["udpport"].asString();
//Check running
if (!inputProcesses.count(name) || !Util::Procs::isRunning(inputProcesses[name])){
// False: start TS input
INFO_MSG("No TS Input running on port %s for stream %s, starting it", udpPort.c_str(), name.c_str());
std::deque<std::string> command;
command.push_back(Util::getMyPath() + "MistInTSStream");
command.push_back("-s");
command.push_back(name);
command.push_back("-p");
command.push_back(udpPort);
command.push_back(URL);
int stdIn = 0;
int stdOut = 1;
int stdErr = 2;
pid_t program = Util::Procs::StartPiped(command, &stdIn, &stdOut, &stdErr);
if (program){
inputProcesses[name] = program;
}
}
//Check hasViewers
// True: data["online"] = 2;
// False: data["online"] =11;
}
return; return;
} }
if (URL.substr(0, 1) == "/"){ if (URL.substr(0, 1) == "/"){
@ -204,13 +230,12 @@ namespace Controller {
for (JSON::ObjIter jit = out.ObjBegin(); jit != out.ObjEnd(); jit++){ for (JSON::ObjIter jit = out.ObjBegin(); jit != out.ObjEnd(); jit++){
if ( !in.isMember(jit->first)){ if ( !in.isMember(jit->first)){
toDelete.insert(jit->first); toDelete.insert(jit->first);
Log("STRM", std::string("Deleted stream ") + jit->first);
} }
} }
//actually delete the streams //actually delete the streams
while (toDelete.size() > 0){ while (toDelete.size() > 0){
std::string deleting = *(toDelete.begin()); std::string deleting = *(toDelete.begin());
out.removeMember(deleting); deleteStream(deleting, out);
toDelete.erase(deleting); toDelete.erase(deleting);
} }
@ -229,4 +254,19 @@ namespace Controller {
} }
void deleteStream(const std::string & name, JSON::Value & out) {
if (!out.isMember(name)){
return;
}
Log("STRM", std::string("Deleted stream ") + name);
out.removeMember(name);
if (inputProcesses.count(name)){
pid_t procId = inputProcesses[name];
if (Util::Procs::isRunning(procId)){
Util::Procs::Stop(procId);
}
inputProcesses.erase(name);
}
}
} //Controller namespace } //Controller namespace

View file

@ -6,9 +6,11 @@ namespace Controller {
bool CheckAllStreams(JSON::Value & data); bool CheckAllStreams(JSON::Value & data);
void CheckStreams(JSON::Value & in, JSON::Value & out); void CheckStreams(JSON::Value & in, JSON::Value & out);
void AddStreams(JSON::Value & in, JSON::Value & out); void AddStreams(JSON::Value & in, JSON::Value & out);
void deleteStream(const std::string & name, JSON::Value & out);
struct liveCheck { struct liveCheck {
long long int lastms; long long int lastms;
long long int last_active; long long int last_active;
}; };
} //Controller namespace } //Controller namespace

View file

@ -2,6 +2,7 @@
#include <fcntl.h> #include <fcntl.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <mist/stream.h>
#include <mist/defines.h> #include <mist/defines.h>
#include "input.h" #include "input.h"
#include <sstream> #include <sstream>
@ -10,25 +11,29 @@
namespace Mist { namespace Mist {
Input * Input::singleton = NULL; Input * Input::singleton = NULL;
void Input::userCallback(char * data, size_t len, unsigned int id){ void Input::userCallback(char * data, size_t len, unsigned int id) {
for (int i = 0; i < 5; i++){ for (int i = 0; i < 5; i++) {
unsigned long tid = ((unsigned long)(data[i*6]) << 24) | ((unsigned long)(data[i*6+1]) << 16) | ((unsigned long)(data[i*6+2]) << 8) | ((unsigned long)(data[i*6+3])); unsigned long tid = ((unsigned long)(data[i * 6]) << 24) | ((unsigned long)(data[i * 6 + 1]) << 16) | ((unsigned long)(data[i * 6 + 2]) << 8) | ((unsigned long)(data[i * 6 + 3]));
if (tid){ if (tid) {
unsigned long keyNum = ((unsigned long)(data[i*6+4]) << 8) | ((unsigned long)(data[i*6+5])); unsigned long keyNum = ((unsigned long)(data[i * 6 + 4]) << 8) | ((unsigned long)(data[i * 6 + 5]));
bufferFrame(tid, keyNum + 1);//Try buffer next frame bufferFrame(tid, keyNum + 1);//Try buffer next frame
} }
} }
} }
void Input::callbackWrapper(char * data, size_t len, unsigned int id){ void Input::callbackWrapper(char * data, size_t len, unsigned int id) {
singleton->userCallback(data, 30, id);//call the userCallback for this input singleton->userCallback(data, 30, id);//call the userCallback for this input
} }
Input::Input(Util::Config * cfg) : InOutBase() { Input::Input(Util::Config * cfg) : InOutBase() {
config = cfg; config = cfg;
#ifdef INPUT_LIVE
standAlone = false;
#else
standAlone = true; standAlone = true;
#endif
JSON::Value option; JSON::Value option;
option["long"] = "json"; option["long"] = "json";
option["short"] = "j"; option["short"] = "j";
@ -69,36 +74,35 @@ namespace Mist {
option.null(); option.null();
/*LTS-END*/ /*LTS-END*/
capa["optional"]["debug"]["name"] = "debug"; capa["optional"]["debug"]["name"] = "debug";
capa["optional"]["debug"]["help"] = "The debug level at which messages need to be printed."; capa["optional"]["debug"]["help"] = "The debug level at which messages need to be printed.";
capa["optional"]["debug"]["option"] = "--debug"; capa["optional"]["debug"]["option"] = "--debug";
capa["optional"]["debug"]["type"] = "debug"; capa["optional"]["debug"]["type"] = "debug";
packTime = 0; packTime = 0;
lastActive = Util::epoch(); lastActive = Util::epoch();
playing = 0; playing = 0;
playUntil = 0; playUntil = 0;
singleton = this; singleton = this;
isBuffer = false; isBuffer = false;
} }
void Input::checkHeaderTimes(std::string streamFile){ void Input::checkHeaderTimes(std::string streamFile) {
if ( streamFile == "-" ){ if (streamFile == "-") {
return; return;
} }
std::string headerFile = streamFile + ".dtsh"; std::string headerFile = streamFile + ".dtsh";
FILE * tmp = fopen(headerFile.c_str(),"r"); FILE * tmp = fopen(headerFile.c_str(), "r");
if (tmp == NULL){ if (tmp == NULL) {
DEBUG_MSG(DLVL_HIGH, "Can't open header: %s. Assuming all is fine.", headerFile.c_str() ); DEBUG_MSG(DLVL_HIGH, "Can't open header: %s. Assuming all is fine.", headerFile.c_str());
return; return;
} }
struct stat bufStream; struct stat bufStream;
struct stat bufHeader; struct stat bufHeader;
//fstat(fileno(streamFile), &bufStream); //fstat(fileno(streamFile), &bufStream);
//fstat(fileno(tmp), &bufHeader); //fstat(fileno(tmp), &bufHeader);
if (stat(streamFile.c_str(), &bufStream) !=0 || stat(headerFile.c_str(), &bufHeader) !=0){ if (stat(streamFile.c_str(), &bufStream) != 0 || stat(headerFile.c_str(), &bufHeader) != 0) {
DEBUG_MSG(DLVL_HIGH, "Could not compare stream and header timestamps - assuming all is fine."); DEBUG_MSG(DLVL_HIGH, "Could not compare stream and header timestamps - assuming all is fine.");
fclose(tmp); fclose(tmp);
return; return;
@ -106,36 +110,47 @@ namespace Mist {
int timeStream = bufStream.st_mtime; int timeStream = bufStream.st_mtime;
int timeHeader = bufHeader.st_mtime; int timeHeader = bufHeader.st_mtime;
fclose(tmp); fclose(tmp);
if (timeHeader < timeStream){ if (timeHeader < timeStream) {
//delete filename //delete filename
INFO_MSG("Overwriting outdated DTSH header file: %s ",headerFile.c_str()); INFO_MSG("Overwriting outdated DTSH header file: %s ", headerFile.c_str());
remove(headerFile.c_str()); remove(headerFile.c_str());
} }
} }
int Input::run() { int Input::run() {
if (streamName != "") {
config->getOption("streamname") = streamName;
}
streamName = config->getString("streamname"); streamName = config->getString("streamname");
if (config->getBool("json")) { if (config->getBool("json")) {
std::cout << capa.toString() << std::endl; std::cout << capa.toString() << std::endl;
return 0; return 0;
} }
if (!setup()){ if (!setup()) {
std::cerr << config->getString("cmd") << " setup failed." << std::endl; std::cerr << config->getString("cmd") << " setup failed." << std::endl;
return 0; return 0;
} }
//Do not read the header if this is a live stream
#ifndef INPUT_LIVE
checkHeaderTimes(config->getString("input")); checkHeaderTimes(config->getString("input"));
if (!readHeader()){ if (!readHeader()) {
std::cerr << "Reading header for " << config->getString("input") << " failed." << std::endl; std::cerr << "Reading header for " << config->getString("input") << " failed." << std::endl;
return 0; return 0;
} }
parseHeader(); parseHeader();
#endif
//Live inputs only have a serve() mode
#ifndef INPUT_LIVE
if (!config->getString("streamname").size()){ if (!config->getString("streamname").size()){
convert(); convert();
}else{ }else{
#endif
serve(); serve();
#ifndef INPUT_LIVE
} }
#endif
return 0; return 0;
} }
@ -173,6 +188,35 @@ namespace Mist {
void Input::serve(){ void Input::serve(){
char userPageName[NAME_BUFFER_SIZE]; char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
#ifdef INPUT_LIVE
Util::startInput(streamName);
userClient = IPC::sharedClient(userPageName, 30, true);
getNext();
while (thisPacket || config->is_active){
unsigned long tid = thisPacket.getTrackId();
//Check for eligibility of track
IPC::userConnection userConn(userClient.getData());
if (trackOffset.count(tid) && !userConn.getTrackId(trackOffset[tid])){
trackOffset.erase(tid);
trackState.erase(tid);
trackMap.erase(tid);
trackBuffer.erase(tid);
pagesByTrack.erase(tid);
metaPages.erase(tid);
curPageNum.erase(tid);
curPage.erase(tid);
INFO_MSG("Erasing track %d", tid);
continue;
}
if (thisPacket){
continueNegotiate(thisPacket.getTrackId());
bufferLivePacket(thisPacket);
}
getNext();
userClient.keepAlive();
}
userClient.finish();
#else
userPage.init(userPageName, PLAY_EX_SIZE, true); userPage.init(userPageName, PLAY_EX_SIZE, true);
if (!isBuffer){ if (!isBuffer){
for (std::map<unsigned int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ for (std::map<unsigned int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
@ -185,8 +229,8 @@ namespace Mist {
long long int activityCounter = Util::bootSecs(); long long int activityCounter = Util::bootSecs();
while ((Util::bootSecs() - activityCounter) < 10 && config->is_active){//10 second timeout while ((Util::bootSecs() - activityCounter) < 10 && config->is_active){//10 second timeout
Util::wait(1000); Util::wait(1000);
removeUnused();
userPage.parseEach(callbackWrapper); userPage.parseEach(callbackWrapper);
removeUnused();
if (userPage.amount){ if (userPage.amount){
activityCounter = Util::bootSecs(); activityCounter = Util::bootSecs();
DEBUG_MSG(DLVL_INSANE, "Connected users: %d", userPage.amount); DEBUG_MSG(DLVL_INSANE, "Connected users: %d", userPage.amount);
@ -194,40 +238,41 @@ namespace Mist {
DEBUG_MSG(DLVL_INSANE, "Timer running"); DEBUG_MSG(DLVL_INSANE, "Timer running");
} }
} }
#endif
finish(); finish();
DEBUG_MSG(DLVL_DEVEL,"Input for stream %s closing clean", streamName.c_str()); DEBUG_MSG(DLVL_DEVEL,"Input for stream %s closing clean", streamName.c_str());
//end player functionality //end player functionality
} }
void Input::finish(){ void Input::finish() {
for( std::map<unsigned int, std::map<unsigned int, unsigned int> >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++){ for (std::map<unsigned int, std::map<unsigned int, unsigned int> >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++) {
for (std::map<unsigned int, unsigned int>::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++){ for (std::map<unsigned int, unsigned int>::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++) {
it2->second = 1; it2->second = 1;
} }
} }
removeUnused(); removeUnused();
if (standAlone){ if (standAlone) {
for (std::map<unsigned long, IPC::sharedPage>::iterator it = metaPages.begin(); it != metaPages.end(); it++){ for (std::map<unsigned long, IPC::sharedPage>::iterator it = metaPages.begin(); it != metaPages.end(); it++) {
it->second.master = true; it->second.master = true;
} }
} }
} }
void Input::removeUnused(){ void Input::removeUnused() {
for (std::map<unsigned int, std::map<unsigned int, unsigned int> >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++){ for (std::map<unsigned int, std::map<unsigned int, unsigned int> >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++) {
for (std::map<unsigned int, unsigned int>::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++){ for (std::map<unsigned int, unsigned int>::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++) {
it2->second--; it2->second--;
} }
bool change = true; bool change = true;
while (change){ while (change) {
change = false; change = false;
for (std::map<unsigned int, unsigned int>::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++){ for (std::map<unsigned int, unsigned int>::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++) {
if (!it2->second){ if (!it2->second) {
bufferRemove(it->first, it2->first); bufferRemove(it->first, it2->first);
pageCounter[it->first].erase(it2->first); pageCounter[it->first].erase(it2->first);
for (int i = 0; i < 8192; i += 8){ for (int i = 0; i < 8192; i += 8) {
unsigned int thisKeyNum = ntohl(((((long long int *)(metaPages[it->first].mapped + i))[0]) >> 32) & 0xFFFFFFFF); unsigned int thisKeyNum = ntohl(((((long long int *)(metaPages[it->first].mapped + i))[0]) >> 32) & 0xFFFFFFFF);
if (thisKeyNum == it2->first){ if (thisKeyNum == it2->first) {
(((long long int *)(metaPages[it->first].mapped + i))[0]) = 0; (((long long int *)(metaPages[it->first].mapped + i))[0]) = 0;
} }
} }
@ -238,106 +283,106 @@ namespace Mist {
} }
} }
} }
void Input::parseHeader(){ void Input::parseHeader() {
DEBUG_MSG(DLVL_DONTEVEN,"Parsing the header"); DEBUG_MSG(DLVL_DONTEVEN, "Parsing the header");
selectedTracks.clear(); selectedTracks.clear();
std::stringstream trackSpec; std::stringstream trackSpec;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
DEBUG_MSG(DLVL_VERYHIGH, "Track %u encountered", it->first); DEBUG_MSG(DLVL_VERYHIGH, "Track %u encountered", it->first);
if (trackSpec.str() != ""){ if (trackSpec.str() != "") {
trackSpec << " "; trackSpec << " ";
} }
trackSpec << it->first; trackSpec << it->first;
DEBUG_MSG(DLVL_VERYHIGH, "Trackspec now %s", trackSpec.str().c_str()); DEBUG_MSG(DLVL_VERYHIGH, "Trackspec now %s", trackSpec.str().c_str());
for (std::deque<DTSC::Key>::iterator it2 = it->second.keys.begin(); it2 != it->second.keys.end(); it2++){ for (std::deque<DTSC::Key>::iterator it2 = it->second.keys.begin(); it2 != it->second.keys.end(); it2++) {
keyTimes[it->first].insert(it2->getTime()); keyTimes[it->first].insert(it2->getTime());
} }
} }
trackSelect(trackSpec.str()); trackSelect(trackSpec.str());
bool hasKeySizes = true; bool hasKeySizes = true;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
if (!it->second.keySizes.size()){ if (!it->second.keySizes.size()) {
hasKeySizes = false; hasKeySizes = false;
break; break;
} }
} }
if (hasKeySizes){ if (hasKeySizes) {
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
bool newData = true; bool newData = true;
for (int i = 0; i < it->second.keys.size(); i++){ for (int i = 0; i < it->second.keys.size(); i++) {
if (newData){ if (newData) {
//i+1 because keys are 1-indexed //i+1 because keys are 1-indexed
pagesByTrack[it->first][i+1].firstTime = it->second.keys[i].getTime(); pagesByTrack[it->first][i + 1].firstTime = it->second.keys[i].getTime();
newData = false; newData = false;
} }
pagesByTrack[it->first].rbegin()->second.keyNum++; pagesByTrack[it->first].rbegin()->second.keyNum++;
pagesByTrack[it->first].rbegin()->second.partNum += it->second.keys[i].getParts(); pagesByTrack[it->first].rbegin()->second.partNum += it->second.keys[i].getParts();
pagesByTrack[it->first].rbegin()->second.dataSize += it->second.keySizes[i]; pagesByTrack[it->first].rbegin()->second.dataSize += it->second.keySizes[i];
if (pagesByTrack[it->first].rbegin()->second.dataSize > FLIP_DATA_PAGE_SIZE){ if (pagesByTrack[it->first].rbegin()->second.dataSize > FLIP_DATA_PAGE_SIZE) {
newData = true; newData = true;
} }
} }
} }
}else{ } else {
std::map<int, DTSCPageData> curData; std::map<int, DTSCPageData> curData;
std::map<int, booking> bookKeeping; std::map<int, booking> bookKeeping;
seek(0);
getNext();
while(thisPacket){//loop through all seek(0);
unsigned int tid = thisPacket.getTrackId(); getNext();
if (!tid){
getNext(false);
continue;
}
if (!bookKeeping.count(tid)){
bookKeeping[tid].first = 1;
bookKeeping[tid].curPart = 0;
bookKeeping[tid].curKey = 0;
curData[tid].lastKeyTime = 0xFFFFFFFF;
curData[tid].keyNum = 1;
curData[tid].partNum = 0;
curData[tid].dataSize = 0;
curData[tid].curOffset = 0;
curData[tid].firstTime = myMeta.tracks[tid].keys[0].getTime();
} while (thisPacket) { //loop through all
if (myMeta.tracks[tid].keys[bookKeeping[tid].curKey].getParts() + 1 == curData[tid].partNum){ unsigned int tid = thisPacket.getTrackId();
if (curData[tid].dataSize > FLIP_DATA_PAGE_SIZE) { if (!tid) {
pagesByTrack[tid][bookKeeping[tid].first] = curData[tid]; getNext(false);
bookKeeping[tid].first += curData[tid].keyNum; continue;
curData[tid].keyNum = 0; }
curData[tid].dataSize = 0; if (!bookKeeping.count(tid)) {
curData[tid].firstTime = myMeta.tracks[tid].keys[bookKeeping[tid].curKey].getTime(); bookKeeping[tid].first = 1;
bookKeeping[tid].curPart = 0;
bookKeeping[tid].curKey = 0;
curData[tid].lastKeyTime = 0xFFFFFFFF;
curData[tid].keyNum = 1;
curData[tid].partNum = 0;
curData[tid].dataSize = 0;
curData[tid].curOffset = 0;
curData[tid].firstTime = myMeta.tracks[tid].keys[0].getTime();
}
if (myMeta.tracks[tid].keys[bookKeeping[tid].curKey].getParts() + 1 == curData[tid].partNum) {
if (curData[tid].dataSize > FLIP_DATA_PAGE_SIZE) {
pagesByTrack[tid][bookKeeping[tid].first] = curData[tid];
bookKeeping[tid].first += curData[tid].keyNum;
curData[tid].keyNum = 0;
curData[tid].dataSize = 0;
curData[tid].firstTime = myMeta.tracks[tid].keys[bookKeeping[tid].curKey].getTime();
}
bookKeeping[tid].curKey++;
curData[tid].keyNum++;
curData[tid].partNum = 0;
}
curData[tid].dataSize += thisPacket.getDataLen();
curData[tid].partNum ++;
bookKeeping[tid].curPart ++;
DEBUG_MSG(DLVL_DONTEVEN, "Track %ld:%llu on page %d@%llu (len:%d), being part %lu of key %lu", thisPacket.getTrackId(), thisPacket.getTime(), bookKeeping[tid].first, curData[tid].dataSize, thisPacket.getDataLen(), curData[tid].partNum, bookKeeping[tid].first + curData[tid].keyNum);
getNext(false);
}
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
if (curData.count(it->first) && !pagesByTrack[it->first].count(bookKeeping[it->first].first)) {
pagesByTrack[it->first][bookKeeping[it->first].first] = curData[it->first];
} }
bookKeeping[tid].curKey++;
curData[tid].keyNum++;
curData[tid].partNum = 0;
} }
curData[tid].dataSize += thisPacket.getDataLen();
curData[tid].partNum ++;
bookKeeping[tid].curPart ++;
DEBUG_MSG(DLVL_DONTEVEN, "Track %ld:%llu on page %d@%llu (len:%d), being part %lu of key %lu", thisPacket.getTrackId(), thisPacket.getTime(), bookKeeping[tid].first, curData[tid].dataSize, thisPacket.getDataLen(), curData[tid].partNum, bookKeeping[tid].first+curData[tid].keyNum);
getNext(false);
} }
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
if (curData.count(it->first) && !pagesByTrack[it->first].count(bookKeeping[it->first].first)){ if (!pagesByTrack.count(it->first)) {
pagesByTrack[it->first][bookKeeping[it->first].first] = curData[it->first]; DEBUG_MSG(DLVL_WARN, "No pages for track %d found", it->first);
} } else {
} DEBUG_MSG(DLVL_MEDIUM, "Track %d (%s) split into %lu pages", it->first, myMeta.tracks[it->first].codec.c_str(), pagesByTrack[it->first].size());
} for (std::map<unsigned long, DTSCPageData>::iterator it2 = pagesByTrack[it->first].begin(); it2 != pagesByTrack[it->first].end(); it2++) {
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ DEBUG_MSG(DLVL_VERYHIGH, "Page %lu-%lu, (%llu bytes)", it2->first, it2->first + it2->second.keyNum - 1, it2->second.dataSize);
if (!pagesByTrack.count(it->first)){ }
DEBUG_MSG(DLVL_WARN, "No pages for track %d found", it->first);
}else{
DEBUG_MSG(DLVL_MEDIUM, "Track %d (%s) split into %lu pages", it->first, myMeta.tracks[it->first].codec.c_str(), pagesByTrack[it->first].size());
for (std::map<unsigned long, DTSCPageData>::iterator it2 = pagesByTrack[it->first].begin(); it2 != pagesByTrack[it->first].end(); it2++){
DEBUG_MSG(DLVL_VERYHIGH, "Page %lu-%lu, (%llu bytes)", it2->first, it2->first + it2->second.keyNum - 1, it2->second.dataSize);
}
} }
} }
} }
@ -345,15 +390,24 @@ namespace Mist {
bool Input::bufferFrame(unsigned int track, unsigned int keyNum){ bool Input::bufferFrame(unsigned int track, unsigned int keyNum){
VERYHIGH_MSG("bufferFrame for stream %s, track %u, key %u", streamName.c_str(), track, keyNum); VERYHIGH_MSG("bufferFrame for stream %s, track %u, key %u", streamName.c_str(), track, keyNum);
if (keyNum > myMeta.tracks[track].keys.size()){ if (keyNum >= myMeta.tracks[track].keys.size()){
//End of movie here, returning true to avoid various error messages //End of movie here, returning true to avoid various error messages
VERYHIGH_MSG("Key number is higher than total key count. Cancelling bufferFrame"); VERYHIGH_MSG("Key number is higher than total key count. Cancelling bufferFrame");
return true; return true;
} }
if (keyNum < 1){keyNum = 1;} if (keyNum < 1) {
//abort in case already buffered keyNum = 1;
int pageNumber = bufferedOnPage(track, keyNum); }
if (pageNumber){ if (isBuffered(track, keyNum)) {
//get corresponding page number
int pageNumber = 0;
for (std::map<unsigned long, DTSCPageData>::iterator it = pagesByTrack[track].begin(); it != pagesByTrack[track].end(); it++) {
if (it->first <= keyNum) {
pageNumber = it->first;
} else {
break;
}
}
pageCounter[track][pageNumber] = 15; pageCounter[track][pageNumber] = 15;
VERYHIGH_MSG("Track %u, key %u is already buffered in page %d. Cancelling bufferFrame", track, keyNum, pageNumber); VERYHIGH_MSG("Track %u, key %u is already buffered in page %d. Cancelling bufferFrame", track, keyNum, pageNumber);
return true; return true;
@ -366,7 +420,7 @@ namespace Mist {
INFO_MSG("Loading key %u from page %lu", keyNum, (--(pagesByTrack[track].upper_bound(keyNum)))->first); INFO_MSG("Loading key %u from page %lu", keyNum, (--(pagesByTrack[track].upper_bound(keyNum)))->first);
keyNum = (--(pagesByTrack[track].upper_bound(keyNum)))->first; keyNum = (--(pagesByTrack[track].upper_bound(keyNum)))->first;
if (!bufferStart(track, keyNum)){ if (!bufferStart(track, keyNum)){
WARN_MSG("bufferStart failed! Cancelling bufferFrame", track); WARN_MSG("bufferStart failed! Cancelling bufferFrame");
return false; return false;
} }
@ -375,16 +429,16 @@ namespace Mist {
trackSelect(trackSpec.str()); trackSelect(trackSpec.str());
seek(myMeta.tracks[track].keys[keyNum - 1].getTime()); seek(myMeta.tracks[track].keys[keyNum - 1].getTime());
long long unsigned int stopTime = myMeta.tracks[track].lastms + 1; long long unsigned int stopTime = myMeta.tracks[track].lastms + 1;
if ((int)myMeta.tracks[track].keys.size() > keyNum - 1 + pagesByTrack[track][keyNum].keyNum){ if ((int)myMeta.tracks[track].keys.size() > keyNum - 1 + pagesByTrack[track][keyNum].keyNum) {
stopTime = myMeta.tracks[track].keys[keyNum - 1 + pagesByTrack[track][keyNum].keyNum].getTime(); stopTime = myMeta.tracks[track].keys[keyNum - 1 + pagesByTrack[track][keyNum].keyNum].getTime();
} }
DEBUG_MSG(DLVL_HIGH, "Playing from %llu to %llu", myMeta.tracks[track].keys[keyNum - 1].getTime(), stopTime); DEBUG_MSG(DLVL_HIGH, "Playing from %llu to %llu", myMeta.tracks[track].keys[keyNum - 1].getTime(), stopTime);
getNext(); getNext();
//in case earlier seeking was inprecise, seek to the exact point //in case earlier seeking was inprecise, seek to the exact point
while (thisPacket && thisPacket.getTime() < (unsigned long long)myMeta.tracks[track].keys[keyNum - 1].getTime()){ while (thisPacket && thisPacket.getTime() < (unsigned long long)myMeta.tracks[track].keys[keyNum - 1].getTime()) {
getNext(); getNext();
} }
while (thisPacket && thisPacket.getTime() < stopTime){ while (thisPacket && thisPacket.getTime() < stopTime) {
bufferNext(thisPacket); bufferNext(thisPacket);
getNext(); getNext();
} }
@ -393,39 +447,39 @@ namespace Mist {
pageCounter[track][keyNum] = 15; pageCounter[track][keyNum] = 15;
return true; return true;
} }
bool Input::atKeyFrame(){ bool Input::atKeyFrame() {
static std::map<int, unsigned long long> lastSeen; static std::map<int, unsigned long long> lastSeen;
//not in keyTimes? We're not at a keyframe. //not in keyTimes? We're not at a keyframe.
unsigned int c = keyTimes[thisPacket.getTrackId()].count(thisPacket.getTime()); unsigned int c = keyTimes[thisPacket.getTrackId()].count(thisPacket.getTime());
if (!c){ if (!c) {
return false; return false;
} }
//skip double times //skip double times
if (lastSeen.count(thisPacket.getTrackId()) && lastSeen[thisPacket.getTrackId()] == thisPacket.getTime()){ if (lastSeen.count(thisPacket.getTrackId()) && lastSeen[thisPacket.getTrackId()] == thisPacket.getTime()) {
return false; return false;
} }
//set last seen, and return true //set last seen, and return true
lastSeen[thisPacket.getTrackId()] = thisPacket.getTime(); lastSeen[thisPacket.getTrackId()] = thisPacket.getTime();
return true; return true;
} }
void Input::play(int until){ void Input::play(int until) {
playing = -1; playing = -1;
playUntil = until; playUntil = until;
initialTime = 0; initialTime = 0;
benchMark = Util::getMS(); benchMark = Util::getMS();
} }
void Input::playOnce(){ void Input::playOnce() {
if (playing <= 0){ if (playing <= 0) {
playing = 1; playing = 1;
} }
++playing; ++playing;
benchMark = Util::getMS(); benchMark = Util::getMS();
} }
void Input::quitPlay(){ void Input::quitPlay() {
playing = 0; playing = 0;
} }
} }

View file

@ -20,6 +20,7 @@ namespace Mist {
public: public:
Input(Util::Config * cfg); Input(Util::Config * cfg);
virtual int run(); virtual int run();
virtual void argumentsParsed(){}
virtual ~Input() {}; virtual ~Input() {};
protected: protected:
static void callbackWrapper(char * data, size_t len, unsigned int id); static void callbackWrapper(char * data, size_t len, unsigned int id);
@ -36,11 +37,10 @@ namespace Mist {
virtual void removeUnused(); virtual void removeUnused();
virtual void trackSelect(std::string trackSpec){}; virtual void trackSelect(std::string trackSpec){};
virtual void userCallback(char * data, size_t len, unsigned int id); virtual void userCallback(char * data, size_t len, unsigned int id);
virtual void convert();
void serve(); virtual void serve();
void convert();
void parseHeader(); virtual void parseHeader();
bool bufferFrame(unsigned int track, unsigned int keyNum); bool bufferFrame(unsigned int track, unsigned int keyNum);
unsigned int packTime;///Media-timestamp of the last packet. unsigned int packTime;///Media-timestamp of the last packet.

View file

@ -13,7 +13,7 @@
#include "input_buffer.h" #include "input_buffer.h"
#ifndef TIMEOUTMULTIPLIER #ifndef TIMEOUTMULTIPLIER
#define TIMEOUTMULTIPLIER 10 #define TIMEOUTMULTIPLIER 2
#endif #endif
namespace Mist { namespace Mist {
@ -71,6 +71,17 @@ namespace Mist {
capa["optional"]["segmentsize"]["type"] = "uint"; capa["optional"]["segmentsize"]["type"] = "uint";
capa["optional"]["segmentsize"]["default"] = 5000LL; capa["optional"]["segmentsize"]["default"] = 5000LL;
option.null(); option.null();
option["arg"] = "integer";
option["long"] = "udp-port";
option["short"] = "U";
option["help"] = "The UDP port on which to listen for TS Packets";
option["value"].append(0LL);
config->addOption("udpport", option);
capa["optional"]["udpport"]["name"] = "TS/UDP port";
capa["optional"]["udpport"]["help"] = "The UDP port on which to listen for TS Packets, or 0 for disabling TS Input";
capa["optional"]["udpport"]["option"] = "--udp-port";
capa["optional"]["udpport"]["type"] = "uint";
capa["optional"]["udpport"]["default"] = 0LL;
/*LTS-end*/ /*LTS-end*/
capa["source_match"] = "push://*"; capa["source_match"] = "push://*";
capa["priority"] = 9ll; capa["priority"] = 9ll;
@ -334,12 +345,21 @@ namespace Mist {
curPage.erase(tid); curPage.erase(tid);
bufferLocations[tid].erase(bufferLocations[tid].begin()); bufferLocations[tid].erase(bufferLocations[tid].begin());
} }
//Reset the userpage, to allow repushing from TS
IPC::userConnection userConn(pushLocation[it->first]);
for (int i = 0; i < SIMUL_TRACKS; i++){
if (userConn.getTrackId(i) == it->first) {
userConn.setTrackId(i, 0);
userConn.setKeynum(i, 0);
break;
}
}
curPageNum.erase(it->first); curPageNum.erase(it->first);
metaPages[it->first].master = true; metaPages[it->first].master = true;
metaPages.erase(it->first); metaPages.erase(it->first);
activeTracks.erase(it->first); activeTracks.erase(it->first);
pushLocation.erase(it->first); pushLocation.erase(it->first);
myMeta.tracks.erase(it); myMeta.tracks.erase(it->first);
changed = true; changed = true;
break; break;
} }
@ -389,10 +409,10 @@ namespace Mist {
//Get the counter of this user //Get the counter of this user
char counter = (*(data - 1)); char counter = (*(data - 1));
//Each user can have at maximum SIMUL_TRACKS elements in their userpage. //Each user can have at maximum SIMUL_TRACKS elements in their userpage.
IPC::userConnection userConn(data);
for (int index = 0; index < SIMUL_TRACKS; index++){ for (int index = 0; index < SIMUL_TRACKS; index++){
char * thisData = data + (index * 6);
//Get the track id from the current element //Get the track id from the current element
unsigned long value = ((long)(thisData[0]) << 24) | ((long)(thisData[1]) << 16) | ((long)(thisData[2]) << 8) | thisData[3]; unsigned long value = userConn.getTrackId(index);
//Skip value 0xFFFFFFFF as this indicates a previously declined track //Skip value 0xFFFFFFFF as this indicates a previously declined track
if (value == 0xFFFFFFFF){ if (value == 0xFFFFFFFF){
continue; continue;
@ -429,15 +449,11 @@ namespace Mist {
//Add the temporary track id to the list of tracks that are currently being negotiated //Add the temporary track id to the list of tracks that are currently being negotiated
negotiatingTracks.insert(tempMapping); negotiatingTracks.insert(tempMapping);
//Write the temporary id to the userpage element //Write the temporary id to the userpage element
thisData[0] = (tempMapping >> 24) & 0xFF; userConn.setTrackId(index, tempMapping);
thisData[1] = (tempMapping >> 16) & 0xFF;
thisData[2] = (tempMapping >> 8) & 0xFF;
thisData[3] = (tempMapping) & 0xFF;
//Obtain the original track number for the pushing process //Obtain the original track number for the pushing process
unsigned long originalTrack = ((long)(thisData[4]) << 8) | thisData[5]; unsigned long originalTrack = userConn.getKeynum(index);
//Overwrite it with 0xFFFF //Overwrite it with 0xFFFF
thisData[4] = 0xFF; userConn.setKeynum(index, 0xFFFF);
thisData[5] = 0xFF;
DEBUG_MSG(DLVL_HIGH, "Incoming track %lu from pushing process %d has now been assigned temporary id %llu", originalTrack, id, tempMapping); DEBUG_MSG(DLVL_HIGH, "Incoming track %lu from pushing process %d has now been assigned temporary id %llu", originalTrack, id, tempMapping);
} }
@ -498,7 +514,6 @@ namespace Mist {
} }
} }
/*LTS-END*/ /*LTS-END*/
//Remove the "negotiate" status in either case //Remove the "negotiate" status in either case
negotiatingTracks.erase(value); negotiatingTracks.erase(value);
//Set master to true before erasing the page, because we are responsible for cleaning up unused pages //Set master to true before erasing the page, because we are responsible for cleaning up unused pages
@ -547,25 +562,20 @@ namespace Mist {
pushLocation[finalMap] = data; pushLocation[finalMap] = data;
//Initialize the metadata for this track if it was not in place yet. //Initialize the metadata for this track if it was not in place yet.
if (!myMeta.tracks.count(finalMap)){ if (!myMeta.tracks.count(finalMap)){
DEBUG_MSG(DLVL_HIGH, "Inserting metadata for track number %d", finalMap); DEBUG_MSG(DLVL_MEDIUM, "Inserting metadata for track number %d", finalMap);
myMeta.tracks[finalMap] = trackMeta.tracks.begin()->second; myMeta.tracks[finalMap] = trackMeta.tracks.begin()->second;
myMeta.tracks[finalMap].trackID = finalMap; myMeta.tracks[finalMap].trackID = finalMap;
} }
//Write the final mapped track number to the user page element //Write the final mapped track number and keyframe number to the user page element
thisData[0] = (finalMap >> 24) & 0xFF;
thisData[1] = (finalMap >> 16) & 0xFF;
thisData[2] = (finalMap >> 8) & 0xFF;
thisData[3] = (finalMap) & 0xFF;
//Write the key number to start pushing from to to the userpage element.
//This is used to resume pushing as well as pushing new tracks //This is used to resume pushing as well as pushing new tracks
unsigned long keyNum = myMeta.tracks[finalMap].keys.size(); userConn.setTrackId(index, finalMap);
thisData[4] = (keyNum >> 8) & 0xFF; userConn.setKeynum(index, myMeta.tracks[finalMap].keys.size());
thisData[5] = keyNum & 0xFF;
//Update the metadata to reflect all changes //Update the metadata to reflect all changes
updateMeta(); updateMeta();
} }
//If the track is active, and this is the element responsible for pushing it //If the track is active, and this is the element responsible for pushing it
if (activeTracks.count(value) && pushLocation[value] == data){ if (activeTracks.count(value) && pushLocation[value] == data){
INFO_MSG("Track is live and pushin'");
//Open the track index page if we dont have it open yet //Open the track index page if we dont have it open yet
if (!metaPages.count(value) || !metaPages[value].mapped){ if (!metaPages.count(value) || !metaPages[value].mapped){
char firstPage[NAME_BUFFER_SIZE]; char firstPage[NAME_BUFFER_SIZE];
@ -581,6 +591,7 @@ namespace Mist {
} }
void inputBuffer::updateTrackMeta(unsigned long tNum){ void inputBuffer::updateTrackMeta(unsigned long tNum){
INFO_MSG("Updating meta for track %d", tNum);
//Store a reference for easier access //Store a reference for easier access
std::map<unsigned long, DTSCPageData> & locations = bufferLocations[tNum]; std::map<unsigned long, DTSCPageData> & locations = bufferLocations[tNum];
@ -591,6 +602,7 @@ namespace Mist {
continue; continue;
} }
unsigned long keyNum = ntohl(tmpOffset[0]); unsigned long keyNum = ntohl(tmpOffset[0]);
INFO_MSG("Page %d detected, with %d keys", keyNum, ntohl(tmpOffset[1]));
//Add an entry into bufferLocations[tNum] for the pages we haven't handled yet. //Add an entry into bufferLocations[tNum] for the pages we haven't handled yet.
if (!locations.count(keyNum)){ if (!locations.count(keyNum)){
@ -599,7 +611,6 @@ namespace Mist {
locations[keyNum].pageNum = keyNum; locations[keyNum].pageNum = keyNum;
locations[keyNum].keyNum = ntohl(tmpOffset[1]); locations[keyNum].keyNum = ntohl(tmpOffset[1]);
} }
//Since the map is ordered by keynumber, this loop updates the metadata for each page from oldest to newest //Since the map is ordered by keynumber, this loop updates the metadata for each page from oldest to newest
for (std::map<unsigned long, DTSCPageData>::iterator pageIt = locations.begin(); pageIt != locations.end(); pageIt++){ for (std::map<unsigned long, DTSCPageData>::iterator pageIt = locations.begin(); pageIt != locations.end(); pageIt++){
updateMetaFromPage(tNum, pageIt->first); updateMetaFromPage(tNum, pageIt->first);
@ -608,6 +619,7 @@ namespace Mist {
} }
void inputBuffer::updateMetaFromPage(unsigned long tNum, unsigned long pageNum){ void inputBuffer::updateMetaFromPage(unsigned long tNum, unsigned long pageNum){
INFO_MSG("Updating meta for track %d page %d", tNum, pageNum);
DTSCPageData & pageData = bufferLocations[tNum][pageNum]; DTSCPageData & pageData = bufferLocations[tNum][pageNum];
//If the current page is over its 8mb "splitting" boundary //If the current page is over its 8mb "splitting" boundary
@ -615,6 +627,7 @@ namespace Mist {
//And the last keyframe in the parsed metadata is further in the stream than this page //And the last keyframe in the parsed metadata is further in the stream than this page
if (pageData.pageNum + pageData.keyNum < myMeta.tracks[tNum].keys.rbegin()->getNumber()){ if (pageData.pageNum + pageData.keyNum < myMeta.tracks[tNum].keys.rbegin()->getNumber()){
//Assume the entire page is already parsed //Assume the entire page is already parsed
INFO_MSG("Assuming its already done", tNum, pageNum);
return; return;
} }
} }

View file

@ -26,22 +26,36 @@ namespace Mist {
capa["codecs"][0u][0u].append("H264"); capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][1u].append("AAC"); capa["codecs"][0u][1u].append("AAC");
capa["codecs"][0u][1u].append("AC3"); capa["codecs"][0u][1u].append("AC3");
capa["optional"]["port"]["name"] = "UDP Port";
capa["optional"]["port"]["help"] = "The udp port on which to listen for incoming UDP Packets";
capa["optional"]["port"]["type"] = "uint";
capa["optional"]["port"]["default"] = 9876;
capa["optional"]["port"]["option"] = "--port";
cfg->addOption("port",
JSON::fromString("{\"arg\":\"integer\",\"value\":9876,\"short\":\"p\",\"long\":\"port\",\"help\":\"The udp port on which to listen for incoming UDP Packets.\"}"));
pushing = false;
} }
///Setup of TS Input ///Setup of TS Input
bool inputTS::setup() { bool inputTS::setup() {
#ifdef INPUT_LIVE
if (config->getString("input") == "-") { if (config->getString("input") == "-") {
inFile = stdin; inFile = stdin;
}else{ }else{
inFile = fopen(config->getString("input").c_str(), "r"); pushing = true;
udpCon.setBlocking(false);
udpCon.bind(config->getInteger("port"));
} }
#else
if (config->getString("output") != "-") { if (config->getString("input") != "-"){
std::cerr << "Output to non-stdout not yet supported" << std::endl; inFile = fopen(config->getString("input").c_str(), "r");
} }
if (!inFile) { if (!inFile) {
return false; return false;
} }
#endif
return true; return true;
} }
@ -62,154 +76,6 @@ namespace Mist {
} }
} }
void inputTS::parsePESHeader(int tid, pesBuffer & buf){
if (buf.data.size() < 9){
return;
}
if (buf.data.size() < 9 + buf.data[8]){
return;
}
if( (((int)buf.data[0] << 16) | ((int)buf.data[1] << 8) | buf.data[2]) != 0x000001){
DEBUG_MSG(DLVL_WARN, "Parsing PES for track %d failed due to incorrect header (%0.6X), throwing away", tid, (((int)buf.data[0] << 16) | ((int)buf.data[1] << 8) | buf.data[2]) );
buf.data = "";
return;
}
buf.len = (((int)buf.data[4] << 8) | buf.data[5]) - (3 + buf.data[8]);
if ((buf.data[7] >> 6) & 0x02){//Check for PTS presence
buf.time = ((buf.data[9] >> 1) & 0x07);
buf.time <<= 15;
buf.time |= ((int)buf.data[10] << 7) | ((buf.data[11] >> 1) & 0x7F);
buf.time <<= 15;
buf.time |= ((int)buf.data[12] << 7) | ((buf.data[13] >> 1) & 0x7F);
buf.time /= 90;
if (((buf.data[7] & 0xC0) >> 6) & 0x01){//Check for DTS presence (yes, only if PTS present)
buf.offset = buf.time;
buf.time = ((buf.data[14] >> 1) & 0x07);
buf.time <<= 15;
buf.time |= ((int)buf.data[15] << 7) | ((buf.data[16] >> 1) & 0x7F);
buf.time <<= 15;
buf.time |= ((int)buf.data[17] << 7) | ((buf.data[18] >> 1) & 0x7F);
buf.time /= 90;
buf.offset -= buf.time;
}
}
if (!firstTimes.count(tid)){
firstTimes[tid] = buf.time;
}
buf.time -= firstTimes[tid];
buf.data.erase(0, 9 + buf.data[8]);
}
void inputTS::parsePESPayload(int tid, pesBuffer & buf){
if (myMeta.tracks[tid].codec == "H264"){
parseH264PES(tid, buf);
}
if (myMeta.tracks[tid].codec == "AAC"){
parseAACPES(tid, buf);
}
}
void inputTS::parseAACPES(int tid, pesBuffer & buf){
if (!buf.data.size()){
buf.len = 0;
return;
}
if (myMeta.tracks[tid].init == ""){
char audioInit[2];//5 bits object type, 4 bits frequency index, 4 bits channel index
char AACProfile = ((buf.data[2] >> 6) & 0x03) + 1;
char frequencyIndex = ((buf.data[2] >> 2) & 0x0F);
char channelConfig = ((buf.data[2] & 0x01) << 2) | ((buf.data[3] >> 6) & 0x03);
switch(frequencyIndex){
case 0: myMeta.tracks[tid].rate = 96000; break;
case 1: myMeta.tracks[tid].rate = 88200; break;
case 2: myMeta.tracks[tid].rate = 64000; break;
case 3: myMeta.tracks[tid].rate = 48000; break;
case 4: myMeta.tracks[tid].rate = 44100; break;
case 5: myMeta.tracks[tid].rate = 32000; break;
case 6: myMeta.tracks[tid].rate = 24000; break;
case 7: myMeta.tracks[tid].rate = 22050; break;
case 8: myMeta.tracks[tid].rate = 16000; break;
case 9: myMeta.tracks[tid].rate = 12000; break;
case 10: myMeta.tracks[tid].rate = 11025; break;
case 11: myMeta.tracks[tid].rate = 8000; break;
case 12: myMeta.tracks[tid].rate = 7350; break;
default: myMeta.tracks[tid].rate = 0; break;
}
myMeta.tracks[tid].channels = channelConfig;
if (channelConfig == 7){
myMeta.tracks[tid].channels = 8;
}
audioInit[0] = ((AACProfile & 0x1F) << 3) | ((frequencyIndex & 0x0E) >> 1);
audioInit[1] = ((frequencyIndex & 0x01) << 7) | ((channelConfig & 0x0F) << 3);
myMeta.tracks[tid].init = std::string(audioInit, 2);
//\todo This value is right now hardcoded, maybe fix this when we know how
myMeta.tracks[tid].size = 16;
}
buf.len = (((buf.data[3] & 0x03) << 11) | (buf.data[4] << 3) | ((buf.data[5] >> 5) & 0x07)) - (buf.data[1] & 0x01 ? 7 :9);
buf.curSampleCount += 1024 * ((buf.data[6] & 0x3) + 1);//Number of frames * samples per frame(1024);
buf.data.erase(0, (buf.data[1] & 0x01 ? 7 : 9));//Substract header
}
void inputTS::parseH264PES(int tid, pesBuffer & buf){
static char annexB[] = {0x00,0x00,0x01};
static char nalLen[4];
int nalLength = 0;
std::string newData;
int pos = 0;
int nxtPos = buf.data.find(annexB, pos, 3);
//Rewrite buf.data from annexB to size-prefixed h.264
while (nxtPos != std::string::npos){
//Detect next packet (if any) and deduce current packet length
pos = nxtPos + 3;
nxtPos = buf.data.find(annexB, pos, 3);
if (nxtPos == std::string::npos){
nalLength = buf.data.size() - pos;
}else{
nalLength = nxtPos - pos;
if (buf.data[nxtPos - 1] == 0x00){//4-byte annexB header
nalLength--;
}
}
//Do nal type specific stuff
switch (buf.data[pos] & 0x1F){
case 0x05: buf.isKey = true; break;
case 0x07: buf.sps = buf.data.substr(pos, nalLength); break;
case 0x08: buf.pps = buf.data.substr(pos, nalLength); break;
default: break;
}
if ((buf.data[pos] & 0x1F) != 0x07 && (buf.data[pos] & 0x1F) != 0x08 && (buf.data[pos] & 0x1F) != 0x09){
//Append length + payload
nalLen[0] = (nalLength >> 24) & 0xFF;
nalLen[1] = (nalLength >> 16) & 0xFF;
nalLen[2] = (nalLength >> 8) & 0xFF;
nalLen[3] = nalLength & 0xFF;
newData.append(nalLen, 4);
newData += buf.data.substr(pos, nalLength);
}
}
buf.data = newData;
buf.len = newData.size();
//If this packet had both a Sequence Parameter Set (sps) and a Picture Parameter Set (pps), calculate the metadata for the stream
if (buf.sps != "" && buf.pps != ""){
MP4::AVCC avccBox;
avccBox.setVersion(1);
avccBox.setProfile(buf.sps[1]);
avccBox.setCompatibleProfiles(buf.sps[2]);
avccBox.setLevel(buf.sps[3]);
avccBox.setSPSNumber(1);
avccBox.setSPS(buf.sps);
avccBox.setPPSNumber(1);
avccBox.setPPS(buf.pps);
myMeta.tracks[tid].init = std::string(avccBox.payload(), avccBox.payloadSize());
h264::SPS tmpNal(buf.sps, true);
h264::SPSMeta tmpMeta = tmpNal.getCharacteristics();
myMeta.tracks[tid].width = tmpMeta.width;
myMeta.tracks[tid].height = tmpMeta.height;
myMeta.tracks[tid].fpks = tmpMeta.fps * 1000;
}
}
///Reads headers from a TS stream, and saves them into metadata ///Reads headers from a TS stream, and saves them into metadata
///It works by going through the entire TS stream, and every time ///It works by going through the entire TS stream, and every time
@ -229,288 +95,93 @@ namespace Mist {
TS::Packet packet;//to analyse and extract data TS::Packet packet;//to analyse and extract data
fseek(inFile, 0, SEEK_SET);//seek to beginning fseek(inFile, 0, SEEK_SET);//seek to beginning
JSON::Value thisPacket;
std::set<int> PATIds;
std::map<int, int> pidToType;
std::map<int, pesBuffer> lastBuffer;
//h264::SPSmMta spsdata;//to analyse sps data, and extract resolution etc... bool first = true;
long long int lastBpos = 0; long long int lastBpos = 0;
while (packet.FromFile(inFile)){ while (packet.FromFile(inFile)){
//Handle special packets (PAT/PMT) tsStream.parse(packet, lastBpos);
if(packet.getPID() == 0x00){
PATIds.clear();
for (int i = 0; i < ((TS::ProgramAssociationTable&)packet).getProgramCount(); i++){
PATIds.insert(((TS::ProgramAssociationTable&)packet).getProgramPID(i));
}
}
if(PATIds.count(packet.getPID())){
TS::ProgramMappingEntry entry = ((TS::ProgramMappingTable&)packet).getEntry(0);
while(entry){
unsigned int pid = entry.getElementaryPid();
pidToType[pid] = entry.getStreamType();
//Check if the track exists in metadata
if (!myMeta.tracks.count(pid)){
switch (entry.getStreamType()){
case 0x1B:
myMeta.tracks[pid].codec = "H264";
myMeta.tracks[pid].type = "video";
myMeta.tracks[pid].trackID = pid;
break;
case 0x0F:
myMeta.tracks[pid].codec = "AAC";
myMeta.tracks[pid].type = "audio";
myMeta.tracks[pid].trackID = pid;
break;
case 0x81:
myMeta.tracks[pid].codec = "AC3";
myMeta.tracks[pid].type = "audio";
myMeta.tracks[pid].trackID = pid;
break;
default:
DEBUG_MSG(DLVL_WARN, "Ignoring unsupported track type %0.2X, on pid %d", entry.getStreamType(), pid);
break;
}
}
entry.advance();
}
}
if(pidToType.count(packet.getPID())){
//analyzing audio/video
//we have audio/video payload
//get trackID of this packet
int tid = packet.getPID();
if (packet.getUnitStart() && lastBuffer.count(tid) && lastBuffer[tid].len){
parsePESPayload(tid, lastBuffer[tid]);
thisPacket.null();
thisPacket["data"] = lastBuffer[tid].data;
thisPacket["trackid"] = tid;//last trackid
thisPacket["bpos"] = lastBuffer[tid].bpos;
thisPacket["time"] = lastBuffer[tid].time ;
if (lastBuffer[tid].offset){
thisPacket["offset"] = lastBuffer[tid].offset;
}
if (lastBuffer[tid].isKey){
thisPacket["keyframe"] = 1LL;
}
myMeta.update(thisPacket);//metadata was read in
lastBuffer.erase(tid);
}
if (!lastBuffer.count(tid)){
lastBuffer[tid] = pesBuffer();
lastBuffer[tid].bpos = lastBpos;
}
lastBuffer[tid].data.append(packet.getPayload(), packet.getPayloadLength());
if (!lastBuffer[tid].len){
parsePESHeader(tid, lastBuffer[tid]);
}
if (lastBuffer[tid].data.size() == lastBuffer[tid].len) {
parsePESPayload(tid, lastBuffer[tid]);
if (myMeta.tracks[tid].codec == "AAC"){
while(lastBuffer[tid].data.size()){
thisPacket.null();
thisPacket["data"] = lastBuffer[tid].data.substr(0, lastBuffer[tid].len);
thisPacket["trackid"] = tid;//last trackid
thisPacket["bpos"] = lastBuffer[tid].bpos;
thisPacket["time"] = lastBuffer[tid].time + (long long int)((double)((lastBuffer[tid].curSampleCount - 1024) * 1000)/ myMeta.tracks[tid].rate) ;
myMeta.update(thisPacket);//metadata was read in
lastBuffer[tid].data.erase(0, lastBuffer[tid].len);
parsePESPayload(tid, lastBuffer[tid]);
}
}else{
thisPacket.null();
thisPacket["data"] = lastBuffer[tid].data;
thisPacket["trackid"] = tid;//last trackid
thisPacket["bpos"] = lastBuffer[tid].bpos;
thisPacket["time"] = lastBuffer[tid].time ;
if (myMeta.tracks[tid].type == "video"){
if (lastBuffer[tid].offset){
thisPacket["offset"] = lastBuffer[tid].offset;
}
if (lastBuffer[tid].isKey){
thisPacket["keyframe"] = 1LL;
}
}
myMeta.update(thisPacket);//metadata was read in
}
lastBuffer.erase(tid);
}
}
lastBpos = ftell(inFile); lastBpos = ftell(inFile);
while(tsStream.hasPacketOnEachTrack()){
if (first){
tsStream.initializeMetadata(myMeta);
first = false;
}
DTSC::Packet headerPack;
tsStream.getEarliestPacket(headerPack);
myMeta.update(headerPack);
}
} }
std::ofstream oFile(std::string(config->getString("input") + ".dtsh").c_str()); std::ofstream oFile(std::string(config->getString("input") + ".dtsh").c_str());
oFile << myMeta.toJSON().toNetPacked(); oFile << myMeta.toJSON().toNetPacked();
oFile.close(); oFile.close();
exit(1);
return true; return true;
} }
///Reads a full PES packet, starting at the current byteposition
///Assumes that you want a full PES for the first PID encountered
///\todo Update to search for a specific PID
pesBuffer inputTS::readFullPES(int tid){
pesBuffer pesBuf;
pesBuf.tid = tid;
if (feof(inFile)){
DEBUG_MSG(DLVL_DEVEL, "Trying to read a PES past the end of the file, returning");
return pesBuf;
}
unsigned int lastPos = ftell(inFile);
TS::Packet tsBuf;
tsBuf.FromFile(inFile);
//Find first PES start on the selected track
while (tsBuf.getPID() != tid || !tsBuf.getUnitStart()){
lastPos = ftell(inFile);
tsBuf.FromFile(inFile);
if (feof(inFile)){
return pesBuf;
}
}
pesBuf.bpos = lastPos;
pesBuf.data.append(tsBuf.getPayload(), tsBuf.getPayloadLength());
parsePESHeader(tid, pesBuf);
bool unbound = false;
while (pesBuf.data.size() != pesBuf.len){
//ReadNextPage
tsBuf.FromFile(inFile);
if (tsBuf.getPID() == tid && tsBuf.getUnitStart()){
unbound = true;
break;
}
if (feof(inFile)){
DEBUG_MSG(DLVL_DEVEL, "Reached EOF at an unexpected point... what happened?");
return pesBuf;
}
if (tsBuf.getPID() == tid){
pesBuf.data.append(tsBuf.getPayload(), tsBuf.getPayloadLength());
pesBuf.lastPos = ftell(inFile);
}
if (pesBuf.len == 0){
parsePESHeader(tid, pesBuf);
}
}
pesBuf.lastPos = ftell(inFile);
if (unbound){
pesBuf.lastPos -= 188;
}
parsePESPayload(tid, pesBuf);
return pesBuf;
}
///Gets the next packet that is to be sent ///Gets the next packet that is to be sent
///At the moment, the logic of sending the last packet that was finished has been implemented, ///At the moment, the logic of sending the last packet that was finished has been implemented,
///but the seeking and finding data is not yet ready. ///but the seeking and finding data is not yet ready.
///\todo Finish the implementation ///\todo Finish the implementation
void inputTS::getNext(bool smart){ void inputTS::getNext(bool smart){
static JSON::Value thisPack; thisPacket.null();
if ( !playbackBuf.size()){ bool hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack());
DEBUG_MSG(DLVL_WARN, "No seek positions set - returning empty packet.");
thisPacket.null(); if (!hasPacket && (pushing || !feof(inFile))){
TS::Packet tsBuf;
if (!pushing) {
unsigned int bPos = ftell(inFile);
tsBuf.FromFile(inFile);
tsStream.parse(tsBuf, bPos);
}else{
while (udpCon.Receive()){
userClient.keepAlive();
udpDataBuffer.append(udpCon.data, udpCon.data_len);
while (udpDataBuffer.size() > 188 && (udpDataBuffer[0] != 0x47 || udpDataBuffer[188] != 0x47)){
size_t syncPos = udpDataBuffer.find("\107", 1);
udpDataBuffer.erase(0, syncPos);
}
while (udpDataBuffer.size() >= 188){
tsBuf.FromPointer(udpDataBuffer.data());
tsStream.parse(tsBuf, 0);
udpDataBuffer.erase(0,188);
}
}
}
hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack());
}
if (!hasPacket){
return; return;
} }
if (selectedTracks.size() == 1){
//Store current buffer tsStream.getPacket(*selectedTracks.begin(), thisPacket);
pesBuffer thisBuf = *playbackBuf.begin(); }else{
playbackBuf.erase(playbackBuf.begin()); tsStream.getEarliestPacket(thisPacket);
//Seek follow up
fseek(inFile, thisBuf.lastPos, SEEK_SET);
pesBuffer nxtBuf;
if (myMeta.tracks[thisBuf.tid].codec != "AAC" || playbackBuf.size() < 2){
nxtBuf = readFullPES(thisBuf.tid);
} }
if (nxtBuf.len){ tsStream.initializeMetadata(myMeta);
if (myMeta.tracks[nxtBuf.tid].codec == "AAC"){//only in case of aac we have more packets, for now if (!myMeta.tracks.count(thisPacket.getTrackId())){
while (nxtBuf.len){ getNext();
pesBuffer pesBuf;
pesBuf.tid = nxtBuf.tid;
pesBuf.time = nxtBuf.time + ((double)((nxtBuf.curSampleCount - 1024) * 1000)/ myMeta.tracks[nxtBuf.tid].rate) ;
pesBuf.offset = nxtBuf.offset;
pesBuf.len = nxtBuf.len;
pesBuf.lastPos = nxtBuf.lastPos;
pesBuf.isKey = false;
pesBuf.data = nxtBuf.data.substr(0, nxtBuf.len);
playbackBuf.insert(pesBuf);
nxtBuf.data.erase(0, nxtBuf.len);
parsePESPayload(thisBuf.tid, nxtBuf);
}
}else{
nxtBuf.data = nxtBuf.data.substr(0, nxtBuf.len);
playbackBuf.insert(nxtBuf);
}
} }
thisPack.null();
thisPack["data"] = thisBuf.data;
thisPack["trackid"] = thisBuf.tid;
thisPack["bpos"] = thisBuf.bpos;
thisPack["time"] = thisBuf.time;
if (thisBuf.offset){
thisPack["offset"] = thisBuf.offset;
}
if (thisBuf.isKey){
thisPack["keyframe"] = 1LL;
}
std::string tmpStr = thisPack.toNetPacked();
thisPacket.reInit(tmpStr.data(), tmpStr.size());
} }
///Seeks to a specific time ///Seeks to a specific time
void inputTS::seek(int seekTime){ void inputTS::seek(int seekTime){
tsStream.clear();
unsigned long seekPos = 0xFFFFFFFFull;
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
if (feof(inFile)){ unsigned long thisBPos = 0;
clearerr(inFile); for (std::deque<DTSC::Key>::iterator keyIt = myMeta.tracks[*it].keys.begin(); keyIt != myMeta.tracks[*it].keys.end(); keyIt++){
fseek(inFile, 0, SEEK_SET); if (keyIt->getTime() > seekTime){
}
pesBuffer tmpBuf;
tmpBuf.tid = *it;
for (unsigned int i = 0; i < myMeta.tracks[*it].keys.size(); i++){
if (myMeta.tracks[*it].keys[i].getTime() > seekTime){
break; break;
} }
if (myMeta.tracks[*it].keys[i].getTime() > tmpBuf.time){ thisBPos = keyIt->getBpos();
tmpBuf.time = myMeta.tracks[*it].keys[i].getTime();
tmpBuf.bpos = myMeta.tracks[*it].keys[i].getBpos();
}
} }
if (thisBPos < seekPos){
bool foundPacket = false; seekPos = thisBPos;
unsigned long long lastPos;
pesBuffer nxtBuf;
while ( !foundPacket){
lastPos = ftell(inFile);
if (feof(inFile)){
DEBUG_MSG(DLVL_WARN, "Reached EOF during seek to %u in track %d - aborting @ %lld", seekTime, *it, lastPos);
return;
}
fseek(inFile, tmpBuf.bpos, SEEK_SET);
nxtBuf = readFullPES(*it);
if (nxtBuf.time >= seekTime){
foundPacket = true;
}else{
tmpBuf.bpos = nxtBuf.lastPos;
}
}
if (myMeta.tracks[nxtBuf.tid].codec == "AAC"){//only in case of aac we have more packets, for now
while (nxtBuf.len){
pesBuffer pesBuf;
pesBuf.tid = nxtBuf.tid;
pesBuf.time = nxtBuf.time + ((double)((nxtBuf.curSampleCount - 1024) * 1000)/ myMeta.tracks[nxtBuf.tid].rate);
pesBuf.offset = nxtBuf.offset;
pesBuf.len = nxtBuf.len;
pesBuf.lastPos = nxtBuf.lastPos;
pesBuf.isKey = false;
pesBuf.data = nxtBuf.data.substr(0, nxtBuf.len);
playbackBuf.insert(pesBuf);
nxtBuf.data.erase(0, nxtBuf.len);
parsePESPayload(nxtBuf.tid, nxtBuf);
}
}else{
playbackBuf.insert(nxtBuf);
} }
} }
fseek(inFile, seekPos, SEEK_SET);//seek to the correct position
} }
} }

View file

@ -2,57 +2,12 @@
#include <mist/nal.h> #include <mist/nal.h>
#include <mist/dtsc.h> #include <mist/dtsc.h>
#include <mist/ts_packet.h> #include <mist/ts_packet.h>
#include <mist/ts_stream.h>
#include <string> #include <string>
#include <set> #include <set>
namespace Mist { namespace Mist {
class pesBuffer {
public:
pesBuffer() : lastPos(0), len(0), time(0), offset(0), bpos(0), curSampleCount(0), isKey(false) {}
///\brief Less-than comparison for seekPos structures.
///\param rhs The seekPos to compare with.
///\return Whether this object is smaller than rhs.
bool operator < (const pesBuffer & rhs) const {
if (time < rhs.time) {
return true;
} else {
if (time == rhs.time){
if (tid < rhs.tid){
return true;
}
}
}
return false;
}
int tid;//When used for buffering, not for header generation
long long int lastPos;//set by readFullPES, stores the byteposition directly after the last read ts packet
long long int len;
std::string data;
long long int time;
long long int offset;
long long int bpos;
long long int curSampleCount;
bool isKey;
std::string sps;
std::string pps;
};
/*
/// This struct stores all metadata of a track, and sends them once a whole PES has been analyzed and sent
struct trackInfo{
//saves all data that needs to be sent.
//as packets can be interleaved, the data needs to be temporarily stored
long long int lastPos;//last byte position of trackSelect
long long int pesTime;//the pes time of the current pes packet
bool keyframe;//whether the current pes packet of the track has a keyframe or not
std::string curPayload;//payload to be sent to user
unsigned int packetCount;//number of TS packets read between between and end (to set bpos correctly)
};
*/
/// This class contains all functions needed to implement TS Input /// This class contains all functions needed to implement TS Input
class inputTS : public Input { class inputTS : public Input {
public: public:
@ -64,16 +19,13 @@ namespace Mist {
void getNext(bool smart = true); void getNext(bool smart = true);
void seek(int seekTime); void seek(int seekTime);
void trackSelect(std::string trackSpec); void trackSelect(std::string trackSpec);
void parsePESHeader(int tid, pesBuffer & buf);
void parsePESPayload(int tid, pesBuffer & buf);
void parseH264PES(int tid, pesBuffer & buf);
void parseAACPES(int tid, pesBuffer & buf);
pesBuffer readFullPES(int tid);
FILE * inFile;///<The input file with ts data FILE * inFile;///<The input file with ts data
h264::NAL nal;///<Used to analyze raw h264 data TS::Stream tsStream;///<Used for parsing the incoming ts stream
long long int lastPos;///<last position played in file
std::set<pesBuffer> playbackBuf;///Used for buffering playback items bool pushing;
std::map<int, int> firstTimes; Socket::UDPConnection udpCon;
std::string udpDataBuffer;
}; };
} }

View file

@ -16,6 +16,8 @@ int main(int argc, char * argv[]) {
mistIn conv(&conf); mistIn conv(&conf);
if (conf.parseArgs(argc, argv)) { if (conf.parseArgs(argc, argv)) {
std::string streamName = conf.getString("streamname"); std::string streamName = conf.getString("streamname");
conv.argumentsParsed();
#ifndef INPUT_NOLOCK
IPC::semaphore playerLock; IPC::semaphore playerLock;
if (streamName.size()){ if (streamName.size()){
playerLock.open(std::string("/lock_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); playerLock.open(std::string("/lock_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
@ -24,16 +26,21 @@ int main(int argc, char * argv[]) {
return 1; return 1;
} }
} }
#endif
conf.activate(); conf.activate();
while (conf.is_active){ while (conf.is_active){
pid_t pid = fork(); pid_t pid = fork();
if (pid == 0){ if (pid == 0){
#ifndef INPUT_NOLOCK
playerLock.close(); playerLock.close();
#endif
return conv.run(); return conv.run();
} }
if (pid == -1){ if (pid == -1){
DEBUG_MSG(DLVL_FAIL, "Unable to spawn player process"); DEBUG_MSG(DLVL_FAIL, "Unable to spawn player process");
#ifndef INPUT_NOLOCK
playerLock.post(); playerLock.post();
#endif
return 2; return 2;
} }
//wait for the process to exit //wait for the process to exit
@ -57,8 +64,10 @@ int main(int argc, char * argv[]) {
DEBUG_MSG(DLVL_DEVEL, "Input for stream %s uncleanly shut down! Restarting...", streamName.c_str()); DEBUG_MSG(DLVL_DEVEL, "Input for stream %s uncleanly shut down! Restarting...", streamName.c_str());
} }
} }
#ifndef INPUT_NOLOCK
playerLock.post(); playerLock.post();
playerLock.close(); playerLock.close();
#endif
} }
return 0; return 0;
} }

View file

@ -69,9 +69,11 @@ namespace Mist {
VERYHIGH_MSG("bufferStart for stream %s, track %lu, page %lu", streamName.c_str(), tid, pageNumber); VERYHIGH_MSG("bufferStart for stream %s, track %lu, page %lu", streamName.c_str(), tid, pageNumber);
initiateEncryption(); initiateEncryption();
//Initialize the stream metadata if it does not yet exist //Initialize the stream metadata if it does not yet exist
#ifndef INPUT_LIVE
if (!metaPages.count(0)) { if (!metaPages.count(0)) {
initiateMeta(); initiateMeta();
} }
#endif
//If we are a stand-alone player skip track negotiation, as there will be nothing to negotiate with. //If we are a stand-alone player skip track negotiation, as there will be nothing to negotiate with.
if (standAlone) { if (standAlone) {
if (!trackMap.count(tid)) { if (!trackMap.count(tid)) {
@ -86,7 +88,6 @@ namespace Mist {
///\return false if the track has not been accepted (yet) ///\return false if the track has not been accepted (yet)
return false; return false;
} }
//If the track is accepted, we will have a mapped tid //If the track is accepted, we will have a mapped tid
unsigned long mapTid = trackMap[tid]; unsigned long mapTid = trackMap[tid];
@ -196,7 +197,6 @@ namespace Mist {
IPC::releasePage(pageName); IPC::releasePage(pageName);
#endif #endif
toErase.master = true; toErase.master = true;
//Remove the page from the tracks index page //Remove the page from the tracks index page
DEBUG_MSG(DLVL_HIGH, "Removing page %lu on track %lu~>%lu from the corresponding metaPage", pageNumber, tid, mapTid); DEBUG_MSG(DLVL_HIGH, "Removing page %lu on track %lu~>%lu from the corresponding metaPage", pageNumber, tid, mapTid);
for (int i = 0; i < 1024; i++) { for (int i = 0; i < 1024; i++) {
@ -266,7 +266,7 @@ namespace Mist {
size_t curOffset = pagesByTrack[tid][curPageNum[tid]].curOffset; size_t curOffset = pagesByTrack[tid][curPageNum[tid]].curOffset;
//Do nothing when there is not enough free space on the page to add the packet. //Do nothing when there is not enough free space on the page to add the packet.
if (pagesByTrack[tid][curPageNum[tid]].dataSize - curOffset < pack.getDataLen()) { if (pagesByTrack[tid][curPageNum[tid]].dataSize - curOffset < pack.getDataLen()) {
FAIL_MSG("Trying to buffer a packet on page %lu for track %lu~>%lu, but we have a size mismatch. The packet is %lu bytes long, so won't fit at offset %lu on a page of %lu bytes!", curPageNum[tid], tid, mapTid, pack.getDataLen(), curOffset, pagesByTrack[tid][curPageNum[tid]].dataSize); FAIL_MSG("Trying to buffer a packet on page %lu for track %lu~>%lu, but we have a size mismatch. The packet is %d bytes long, so won't fit at offset %lu on a page of %llu bytes!", curPageNum[tid], tid, mapTid, pack.getDataLen(), curOffset, pagesByTrack[tid][curPageNum[tid]].dataSize);
return; return;
} }
@ -385,15 +385,24 @@ namespace Mist {
curPageNum.erase(tid); curPageNum.erase(tid);
} }
void InOutBase::bufferLivePacket(JSON::Value & packet) {
DTSC::Packet realPacket;
realPacket.genericFill(packet["time"].asInt(), packet["offset"].asInt(), packet["trackid"].asInt(), packet["data"].asStringRef().c_str(), packet["data"].asStringRef().size(), packet["bpos"].asInt(), packet["keyframe"].asInt());
bufferLivePacket(realPacket);
}
///Buffers a live packet to a page. ///Buffers a live packet to a page.
/// ///
///Handles both buffering and creation of new pages ///Handles both buffering and creation of new pages
/// ///
///Initiates/continues negotiation with the buffer as well ///Initiates/continues negotiation with the buffer as well
///\param packet The packet to buffer ///\param packet The packet to buffer
void InOutBase::bufferLivePacket(JSON::Value & packet) { void InOutBase::bufferLivePacket(DTSC::Packet & packet){
myMeta.vod = false;
myMeta.live = true;
//Store the trackid for easier access //Store the trackid for easier access
unsigned long tid = packet["trackid"].asInt(); unsigned long tid = packet.getTrackId();
//Do nothing if the trackid is invalid //Do nothing if the trackid is invalid
if (!tid) { if (!tid) {
INFO_MSG("Packet without trackid"); INFO_MSG("Packet without trackid");
@ -430,7 +439,7 @@ namespace Mist {
///\todo Figure out how to act with declined track here ///\todo Figure out how to act with declined track here
bool isKeyframe = false; bool isKeyframe = false;
if (myMeta.tracks[tid].type == "video") { if (myMeta.tracks[tid].type == "video") {
if (packet.isMember("keyframe") && packet["keyframe"]) { if (packet.hasMember("keyframe") && packet.getFlag("keyframe")) {
isKeyframe = true; isKeyframe = true;
} }
} else { } else {
@ -439,7 +448,7 @@ namespace Mist {
isKeyframe = true; isKeyframe = true;
} else { } else {
unsigned long lastKey = pagesByTrack[tid].rbegin()->second.lastKeyTime; unsigned long lastKey = pagesByTrack[tid].rbegin()->second.lastKeyTime;
if (packet["time"].asInt() - lastKey > 5000) { if (packet.getTime() - lastKey > 5000) {
isKeyframe = true; isKeyframe = true;
} }
} }
@ -463,7 +472,7 @@ namespace Mist {
pagesByTrack[tid][nextPageNum].dataSize = (25 * 1024 * 1024); pagesByTrack[tid][nextPageNum].dataSize = (25 * 1024 * 1024);
pagesByTrack[tid][nextPageNum].pageNum = nextPageNum; pagesByTrack[tid][nextPageNum].pageNum = nextPageNum;
} }
pagesByTrack[tid].rbegin()->second.lastKeyTime = packet["time"].asInt(); pagesByTrack[tid].rbegin()->second.lastKeyTime = packet.getTime();
pagesByTrack[tid].rbegin()->second.keyNum++; pagesByTrack[tid].rbegin()->second.keyNum++;
} }
//Set the pageNumber if it has not been set yet //Set the pageNumber if it has not been set yet
@ -535,6 +544,11 @@ namespace Mist {
} }
//Now we either returned or the track has an offset for the user page. //Now we either returned or the track has an offset for the user page.
//Get the data from the userPage //Get the data from the userPage
if (!userClient.getData()){
char userPageName[100];
sprintf(userPageName, SHM_USERS, streamName.c_str());
userClient = IPC::sharedClient(userPageName, 30, true);
}
char * tmp = userClient.getData(); char * tmp = userClient.getData();
if (!tmp) { if (!tmp) {
DEBUG_MSG(DLVL_FAIL, "Failed to negotiate for incoming track %lu, there does not seem to be a connection with the buffer", tid); DEBUG_MSG(DLVL_FAIL, "Failed to negotiate for incoming track %lu, there does not seem to be a connection with the buffer", tid);

View file

@ -36,6 +36,7 @@ namespace Mist {
void bufferFinalize(unsigned long tid); void bufferFinalize(unsigned long tid);
void bufferRemove(unsigned long tid, unsigned long pageNumber); void bufferRemove(unsigned long tid, unsigned long pageNumber);
void bufferLivePacket(JSON::Value & packet); void bufferLivePacket(JSON::Value & packet);
void bufferLivePacket(DTSC::Packet & packet);
bool isBuffered(unsigned long tid, unsigned long keyNum); bool isBuffered(unsigned long tid, unsigned long keyNum);
unsigned long bufferedOnPage(unsigned long tid, unsigned long keyNum); unsigned long bufferedOnPage(unsigned long tid, unsigned long keyNum);
protected: protected:
@ -53,15 +54,12 @@ namespace Mist {
DTSC::Meta myMeta;///< Stores either the input or output metadata DTSC::Meta myMeta;///< Stores either the input or output metadata
std::set<unsigned long> selectedTracks;///< Stores the track id's that are either selected for playback or input std::set<unsigned long> selectedTracks;///< Stores the track id's that are either selected for playback or input
std::map<unsigned long, std::map<unsigned long, DTSCPageData> > pagesByTrack;///<Holds the data for all pages of a track. Based on unmapped tids std::map<unsigned long, std::map<unsigned long, DTSCPageData> > pagesByTrack;///<Holds the data for all pages of a track. Based on unmapped tids
//Negotiation stuff (from unmapped tid's) //Negotiation stuff (from unmapped tid's)
std::map<unsigned long, unsigned long> trackOffset; ///< Offset of data on user page std::map<unsigned long, unsigned long> trackOffset; ///< Offset of data on user page
std::map<unsigned long, negotiationState> trackState; ///< State of the negotiation for tracks std::map<unsigned long, negotiationState> trackState; ///< State of the negotiation for tracks
std::map<unsigned long, unsigned long> trackMap;///<Determines which input track maps onto which "final" track std::map<unsigned long, unsigned long> trackMap;///<Determines which input track maps onto which "final" track
//Using mapped tid's
std::map<unsigned long, IPC::sharedPage> metaPages;///< For each track, holds the page that describes which dataPages are mapped std::map<unsigned long, IPC::sharedPage> metaPages;///< For each track, holds the page that describes which dataPages are mapped
std::map<unsigned long, unsigned long> curPageNum;///< For each track, holds the number page that is currently being written. std::map<unsigned long, unsigned long> curPageNum;///< For each track, holds the number page that is currently being written.
std::map<unsigned long, IPC::sharedPage> curPage;///< For each track, holds the page that is currently being written. std::map<unsigned long, IPC::sharedPage> curPage;///< For each track, holds the page that is currently being written.

View file

@ -966,15 +966,10 @@ namespace Mist {
} }
} }
if (!trackMap.size()){ if (!trackMap.size()){
IPC::userConnection userConn(userClient.getData());
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end() && tNum < SIMUL_TRACKS; it++){ for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end() && tNum < SIMUL_TRACKS; it++){
unsigned int tId = *it; userConn.setTrackId(tNum, *it);
char * thisData = userClient.getData() + (6 * tNum); userConn.setKeynum(tNum, nxtKeyNum[*it]);
thisData[0] = ((tId >> 24) & 0xFF);
thisData[1] = ((tId >> 16) & 0xFF);
thisData[2] = ((tId >> 8) & 0xFF);
thisData[3] = ((tId) & 0xFF);
thisData[4] = ((nxtKeyNum[tId] >> 8) & 0xFF);
thisData[5] = ((nxtKeyNum[tId]) & 0xFF);
tNum ++; tNum ++;
} }
} }