Better working TS input, Pro side. Code by Erik Zandvliet.

This commit is contained in:
Thulinma 2015-11-05 16:56:50 +01:00
parent 8bcda5e57b
commit 35b2dd6bee
15 changed files with 627 additions and 78 deletions

View file

@ -32,10 +32,11 @@ namespace Analysers {
if(std::cin.gcount() != 188){break;}
bytes += 188;
if(packet.FromPointer(packetPtr)){
//std::cout << packet.toPrettyString();
tsStream.parse(packet, bytes);
if (tsStream.hasPacketOnEachTrack()){
if (tsStream.hasPacket(packet.getPID())){
DTSC::Packet dtscPack;
tsStream.getEarliestPacket(dtscPack);
tsStream.getPacket(packet.getPID(), dtscPack);
std::cout << dtscPack.toJSON().toPrettyString();
}
}

View file

@ -204,7 +204,14 @@ namespace Mist {
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
#ifdef INPUT_LIVE
Util::startInput(streamName);
unsigned int giveUpCounter = 0;
while (!Util::startInput(streamName) && config->is_active && ++giveUpCounter < 20) {
Util::sleep(500);
}
if (giveUpCounter >= 20){
FAIL_MSG("Could not start buffer for stream '%s', aborting stream input!", streamName.c_str());
config->is_active = false;
}
userClient = IPC::sharedClient(userPageName, 30, true);
getNext();
while (thisPacket || config->is_active) {
@ -253,7 +260,6 @@ namespace Mist {
long long int activityCounter = Util::bootSecs();
while ((Util::bootSecs() - activityCounter) < 10 && config->is_active) { //10 second timeout
Util::wait(1000);
userPage.parseEach(callbackWrapper);
removeUnused();
if (userPage.amount) {
@ -273,6 +279,7 @@ namespace Mist {
}
}
/*LTS-END*/
Util::sleep(1000);
}
#endif
finish();

View file

@ -39,6 +39,7 @@ namespace Mist {
capa["source_match"] = "/*.ts";
capa["priority"] = 9ll;
capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][0u].append("HEVC");
capa["codecs"][0u][1u].append("AAC");
capa["codecs"][0u][1u].append("AC3");
@ -51,6 +52,13 @@ namespace Mist {
JSON::fromString("{\"arg\":\"integer\",\"value\":9876,\"short\":\"p\",\"long\":\"port\",\"help\":\"The udp port on which to listen for incoming UDP Packets.\"}"));
pushing = false;
inFile = NULL;
}
inputTS::~inputTS() {
if (inFile){
fclose(inFile);
}
}
///Setup of TS Input
@ -113,7 +121,7 @@ namespace Mist {
bool first = true;
long long int lastBpos = 0;
while (packet.FromFile(inFile)){
while (packet.FromFile(inFile) && !feof(inFile)){
tsStream.parse(packet, lastBpos);
lastBpos = ftell(inFile);
while(tsStream.hasPacketOnEachTrack()){
@ -128,10 +136,10 @@ namespace Mist {
}
fseek(inFile, 0, SEEK_SET);
std::ofstream oFile(std::string(config->getString("input") + ".dtsh").c_str());
oFile << myMeta.toJSON().toNetPacked();
oFile.close();
exit(1);
return true;
}
@ -143,16 +151,15 @@ namespace Mist {
void inputTS::getNext(bool smart){
thisPacket.null();
bool hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack());
if (!hasPacket && (pushing || !feof(inFile))){
TS::Packet tsBuf;
while (!hasPacket && (pushing || !feof(inFile)) && config->is_active){
if (!pushing) {
unsigned int bPos = ftell(inFile);
tsBuf.FromFile(inFile);
tsStream.parse(tsBuf, bPos);
if (selectedTracks.count(tsBuf.getPID())){
tsStream.parse(tsBuf, bPos);
}
}else{
while (udpCon.Receive()){
userClient.keepAlive();
udpDataBuffer.append(udpCon.data, udpCon.data_len);
while (udpDataBuffer.size() > 188 && (udpDataBuffer[0] != 0x47 || udpDataBuffer[188] != 0x47)){
size_t syncPos = udpDataBuffer.find("\107", 1);
@ -164,10 +171,23 @@ namespace Mist {
udpDataBuffer.erase(0,188);
}
}
if (userClient.getData()){
userClient.keepAlive();
}
Util::sleep(500);
}
if (userClient.getData()){
userClient.keepAlive();
}
hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack());
}
if (!hasPacket){
if(inFile && !feof(inFile)){
getNext();
}
if (pushing){
sleep(500);
}
return;
}
if (selectedTracks.size() == 1){
@ -180,10 +200,35 @@ namespace Mist {
getNext();
}
}
void inputTS::readPMT(){
//save current file position
int bpos = ftell(inFile);
if (fseek(inFile, 0, SEEK_SET)){
FAIL_MSG("Seek to 0 failed");
return;
}
TS::Packet tsBuffer;
while (!tsStream.hasPacketOnEachTrack() && tsBuffer.FromFile(inFile)){
tsStream.parse(tsBuffer, 0);
}
//Clear leaves the PMT in place
tsStream.clear();
//Restore original file position
if (fseek(inFile, bpos, SEEK_SET)){
return;
}
}
///Seeks to a specific time
void inputTS::seek(int seekTime){
tsStream.clear();
readPMT();
unsigned long seekPos = 0xFFFFFFFFull;
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
unsigned long thisBPos = 0;

View file

@ -12,6 +12,7 @@ namespace Mist {
class inputTS : public Input {
public:
inputTS(Util::Config * cfg);
~inputTS();
protected:
//Private Functions
bool setup();
@ -19,6 +20,7 @@ namespace Mist {
void getNext(bool smart = true);
void seek(int seekTime);
void trackSelect(std::string trackSpec);
void readPMT();
FILE * inFile;///<The input file with ts data
TS::Stream tsStream;///<Used for parsing the incoming ts stream
@ -26,6 +28,8 @@ namespace Mist {
bool pushing;
Socket::UDPConnection udpCon;
std::string udpDataBuffer;
TS::Packet tsBuf;
};
}

View file

@ -300,7 +300,7 @@ namespace Mist {
MP4::UUID_SampleEncryption_Sample newSample;
thisPacket.getString("ivec", newSample.InitializationVector);
std::deque<int> nalSizes = h264::parseNalSizes(thisPacket);
std::deque<int> nalSizes = nalu::parseNalSizes(thisPacket);
for(std::deque<int>::iterator it = nalSizes.begin(); it != nalSizes.end(); it++){
int encrypted = (*it - 5) & ~0xF;//Bitmask to a multiple of 16
MP4::UUID_SampleEncryption_Sample_Entry newEntry;