diff --git a/src/input/input.cpp b/src/input/input.cpp index 59578ad9..6f2cea97 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -296,6 +296,10 @@ namespace Mist { DEBUG_MSG(DLVL_HIGH, "Playing from %ld to %llu", myMeta.tracks[track].keys[pageNum-1].getTime(), stopTime); it->second.curOffset = 0; getNext(); + //in case earlier seeking was inprecise, seek to the exact point + while (lastPack && lastPack.getTime() < myMeta.tracks[track].keys[pageNum-1].getTime()){ + getNext(); + } while (lastPack && lastPack.getTime() < stopTime){ if (it->second.curOffset + lastPack.getDataLen() > pagesByTrack[track][pageNum].dataSize){ DEBUG_MSG(DLVL_WARN, "Trying to write %u bytes on pos %llu where size is %llu (time: %llu / %llu, track %u page %u)", lastPack.getDataLen(), it->second.curOffset, pagesByTrack[track][pageNum].dataSize, lastPack.getTime(), stopTime, track, pageNum); diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index ba96aaca..7408f58d 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -53,6 +53,7 @@ namespace Mist { myMeta.vod = false; myMeta.live = true; myMeta.writeTo(metaPage.mapped); + memset(metaPage.mapped+myMeta.getSendLen(), 0, metaPage.len > myMeta.getSendLen() ? std::min(metaPage.len-myMeta.getSendLen(), 4ll) : 0); } bool inputBuffer::removeKey(unsigned int tid){ @@ -124,97 +125,140 @@ namespace Mist { } void inputBuffer::userCallback(char * data, size_t len, unsigned int id) { - unsigned long tmp = ((long)(data[0]) << 24) | ((long)(data[1]) << 16) | ((long)(data[2]) << 8) | ((long)(data[3])); - if (tmp & 0x80000000) { - //Track is set to "New track request", assign new track id and create shared memory page - unsigned long tNum = (givenTracks.size() ? (*givenTracks.rbegin()) : 0) + 1; - ///\todo Neatify this - data[0] = (tNum >> 24) & 0xFF; - data[1] = (tNum >> 16) & 0xFF; - data[2] = (tNum >> 8) & 0xFF; - data[3] = (tNum) & 0xFF; - givenTracks.insert(tNum); - char tmpChr[100]; - long tmpLen = sprintf(tmpChr, "liveStream_%s%lu", config->getString("streamname").c_str(), tNum); - metaPages[tNum].init(std::string(tmpChr, tmpLen), 8388608, true); - } else { - unsigned long tNum = ((long)(data[0]) << 24) | ((long)(data[1]) << 16) | ((long)(data[2]) << 8) | ((long)(data[3])); - if (!myMeta.tracks.count(tNum)) { - DEBUG_MSG(DLVL_DEVEL, "Tracknum not in meta: %lu, from user %u", tNum, id); - if (metaPages[tNum].mapped) { - if (metaPages[tNum].mapped[0] == 'D' && metaPages[tNum].mapped[1] == 'T') { - unsigned int len = ntohl(((int *)metaPages[tNum].mapped)[1]); - unsigned int i = 0; - JSON::Value tmpMeta; - JSON::fromDTMI((const unsigned char *)metaPages[tNum].mapped + 8, len, i, tmpMeta); - DTSC::Meta tmpTrack(tmpMeta); - int oldTNum = tmpTrack.tracks.begin()->first; - bool collision = false; - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { - if (it->first == tNum) { - continue; - } - if (it->second.getIdentifier() == tmpTrack.tracks[oldTNum].getIdentifier()) { - collision = true; - break; - } + static int nextTempId = 1001; + char counter = (*(data - 1)); + for (int index = 0; index < 5; index++){ + char* thisData = data + (index * 6); + unsigned long value = ((long)(thisData[0]) << 24) | ((long)(thisData[1]) << 16) | ((long)(thisData[2]) << 8) | thisData[3]; + if (value == 0xFFFFFFFF){ + //Skip value 0xFFFFFFFF as this indicates a previously declined track + continue; + } + if (counter == 126 || counter == 127 || counter == 254 || counter == 255){ + if (negotiateTracks.count(value)){ + negotiateTracks.erase(value); + metaPages.erase(value); + } + if (givenTracks.count(value)){ + givenTracks.erase(value); + } + continue; + } + if (value & 0x80000000){ + //Track is set to "New track request", assign new track id and create shared memory page + int tmpTid = nextTempId++; + negotiateTracks.insert(tmpTid); + thisData[0] = (tmpTid >> 24) & 0xFF; + thisData[1] = (tmpTid >> 16) & 0xFF; + thisData[2] = (tmpTid >> 8) & 0xFF; + thisData[3] = (tmpTid) & 0xFF; + unsigned long tNum = ((long)(thisData[4]) << 8) | thisData[5]; + INFO_MSG("Assigning temporary ID %d to incoming track %lu for user %d", tmpTid, tNum, id); + + char tempMetaName[100]; + sprintf(tempMetaName, "liveStream_%s%d", config->getString("streamname").c_str(), tmpTid); + metaPages[tmpTid].init(tempMetaName, 8388608, true); + } + if (negotiateTracks.count(value)){ + INFO_MSG("Negotiating %lu", value); + //Track is currently under negotiation, check whether the metadata has been submitted + if (metaPages[value].mapped){ + INFO_MSG("Mapped %lu", value); + unsigned int len = ntohl(((int *)metaPages[value].mapped)[1]); + unsigned int i = 0; + JSON::Value JSONMeta; + JSON::fromDTMI((const unsigned char *)metaPages[value].mapped + 8, len, i, JSONMeta); + DTSC::Meta tmpMeta(JSONMeta); + if (!tmpMeta.tracks.count(value)){//Track not yet added + continue; + } + + std::string tempId = tmpMeta.tracks.begin()->second.getIdentifier(); + DEBUG_MSG(DLVL_DEVEL, "Attempting colision detection for track %s", tempId.c_str()); + int finalMap = -1; + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) { + if (it->second.type == "video"){ + finalMap = 1; } - if (collision) { - /// \todo Erasing page for now, should do more here - DEBUG_MSG(DLVL_DEVEL, "Collision detected! Erasing page for now, should do more here"); - metaPages.erase(tNum); - data[0] = 0xFF; - data[1] = 0xFF; - data[2] = 0xFF; - data[3] = 0xFF; - } else { - if (!myMeta.tracks.count(tNum)) { - myMeta.tracks[tNum] = tmpTrack.tracks[oldTNum]; - data[4] = 0x00; - data[5] = 0x00; - updateMeta(); - char firstPage[100]; - sprintf(firstPage, "%s%lu", config->getString("streamname").c_str(), tNum); - indexPages[tNum].init(firstPage, 8192, true); - ((long long int *)indexPages[tNum].mapped)[0] = htonl(1000); - ///\todo Fix for non-first-key-pushing - sprintf(firstPage, "%s%lu_0", config->getString("streamname").c_str(), tNum); - ///\todo Make size dynamic / other solution. 25mb is too much. - dataPages[tNum][0].init(firstPage, 26214400, true); - } + if (it->second.type == "audio"){ + finalMap = 2; } } + //Remove the "negotiate" status in either case + negotiateTracks.erase(value); + metaPages.erase(value); + if (finalMap != -1 && givenTracks.count(finalMap)) { + DEBUG_MSG(DLVL_DEVEL, "Collision of new track %lu with track %d detected! Declining track", value, finalMap); + thisData[0] = 0xFF; + thisData[1] = 0xFF; + thisData[2] = 0xFF; + thisData[3] = 0xFF; + } else { + if (finalMap == -1){ + DEBUG_MSG(DLVL_DEVEL, "Invalid track type detected, discarding"); + continue; + }else{ + //Resume either if we have more than 1 keyframe on the replacement track (assume it was already pushing before the track "dissapeared" + //or if the firstms of the replacement track is later than the lastms on the existing track + if (tmpMeta.tracks.begin()->second.keys.size() > 1|| tmpMeta.tracks.begin()->second.firstms >= myMeta.tracks[finalMap].lastms){ + DEBUG_MSG(DLVL_DEVEL, "Allowing negotiation track %lu, from user %u, to resume pushing final track number %d", value, id, finalMap); + }else{ + //Otherwise replace existing track + DEBUG_MSG(DLVL_DEVEL, "Re-push initiated for track %lu, from user %u, will replace final track number %d", value, id, finalMap); + myMeta.tracks.erase(finalMap); + } + } + givenTracks.insert(finalMap); + if (!myMeta.tracks.count(finalMap)){ + myMeta.tracks[finalMap] = tmpMeta.tracks.begin()->second; + myMeta.tracks[finalMap].trackID = finalMap; + } + thisData[0] = (finalMap >> 24) & 0xFF; + thisData[1] = (finalMap >> 16) & 0xFF; + thisData[2] = (finalMap >> 8) & 0xFF; + thisData[3] = (finalMap) & 0xFF; + int keyNum = myMeta.tracks[finalMap].keys.size(); + thisData[4] = (keyNum >> 8) & 0xFF; + thisData[5] = keyNum & 0xFF; + updateMeta(); + char firstPage[100]; + sprintf(firstPage, "%s%d", config->getString("streamname").c_str(), finalMap); + indexPages[finalMap].init(firstPage, 8192, true); + ((long long int *)indexPages[finalMap].mapped)[0] = htonl(1000); + sprintf(firstPage, "%s%d_%d", config->getString("streamname").c_str(), finalMap, keyNum); + ///\todo Make size dynamic / other solution. 25mb is too much. + dataPages[finalMap][0].init(firstPage, 26214400, true); + } } - } else { + } + if (givenTracks.count(value)){ //First check if the previous page has been finished: - if (!inputLoc[tNum].count(dataPages[tNum].rbegin()->first) || !inputLoc[tNum][dataPages[tNum].rbegin()->first].curOffset){ - if (dataPages[tNum].size() > 1){ - int prevPage = (++dataPages[tNum].rbegin())->first; - //update previous page. - updateMetaFromPage(tNum, prevPage); + if (!inputLoc[value].count(dataPages[value].rbegin()->first) || !inputLoc[value][dataPages[value].rbegin()->first].curOffset){ + if (dataPages[value].size() > 1){ + int previousPage = (++dataPages[value].rbegin())->first; + updateMetaFromPage(value, previousPage); } } //update current page - int curPage = dataPages[tNum].rbegin()->first; - updateMetaFromPage(tNum, curPage); - if (inputLoc[tNum][curPage].curOffset > 8388608) { - //create new page is > 8MB - int nxtPage = curPage + inputLoc[tNum][curPage].keyNum; + int currentPage = dataPages[value].rbegin()->first; + updateMetaFromPage(value, currentPage); + if (inputLoc[value][currentPage].curOffset > 8388608) { + int nextPage = currentPage + inputLoc[value][currentPage].keyNum; char nextPageName[100]; - sprintf(nextPageName, "%s%lu_%d", config->getString("streamname").c_str(), tNum, nxtPage); - dataPages[tNum][nxtPage].init(nextPageName, 20971520, true); + sprintf(nextPageName, "%s%lu_%d", config->getString("streamname").c_str(), value, nextPage); + dataPages[value][nextPage].init(nextPageName, 20971520, true); bool createdNew = false; for (int i = 0; i < 8192; i += 8){ - int thisKeyNum = ((((long long int *)(indexPages[tNum].mapped + i))[0]) >> 32) & 0xFFFFFFFF; - if (thisKeyNum == htonl(curPage)){ - if((ntohl((((long long int*)(indexPages[tNum].mapped + i))[0]) & 0xFFFFFFFF) == 1000)){ - ((long long int *)(indexPages[tNum].mapped + i))[0] &= 0xFFFFFFFF00000000ull; - ((long long int *)(indexPages[tNum].mapped + i))[0] |= htonl(inputLoc[tNum][curPage].keyNum); + int thisKeyNum = ((((long long int *)(indexPages[value].mapped + i))[0]) >> 32) & 0xFFFFFFFF; + if (thisKeyNum == htonl(currentPage)){ + if((ntohl((((long long int*)(indexPages[value].mapped + i))[0]) & 0xFFFFFFFF) == 1000)){ + ((long long int *)(indexPages[value].mapped + i))[0] &= 0xFFFFFFFF00000000ull; + ((long long int *)(indexPages[value].mapped + i))[0] |= htonl(inputLoc[value][currentPage].keyNum); } } - if (!createdNew && (((long long int*)(indexPages[tNum].mapped + i))[0]) == 0){ + if (!createdNew && (((long long int*)(indexPages[value].mapped + i))[0]) == 0){ createdNew = true; - ((long long int *)(indexPages[tNum].mapped + i))[0] = (((long long int)htonl(nxtPage)) << 32) | htonl(1000); + ((long long int *)(indexPages[value].mapped + i))[0] = (((long long int)htonl(nextPage)) << 32) | htonl(1000); } } } diff --git a/src/input/input_buffer.h b/src/input/input_buffer.h index 20764f4d..4b913f1a 100644 --- a/src/input/input_buffer.h +++ b/src/input/input_buffer.h @@ -21,8 +21,10 @@ namespace Mist { bool removeKey(unsigned int tid); void removeUnused(); void userCallback(char * data, size_t len, unsigned int id); + std::set negotiateTracks; std::set givenTracks; std::map metaPages; + ///Maps trackid to a pagenum->pageData map std::map > inputLoc; inputBuffer * singleton; }; diff --git a/src/output/output.cpp b/src/output/output.cpp index 98758270..f7eecbbd 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -78,6 +78,142 @@ namespace Mist { isInitialized = false; myConn.close(); } + + void Output::negotiateWithBuffer(int tid){ + //Check whether the track exists + if (!meta_out.tracks.count(tid)) { + return; + } + //Do not re-negotiate already confirmed tracks + if (trackMap.count(tid)){ + return; + } + //Do not re-negotiate if already at maximum for push tracks + if (trackMap.size() >= 5){ + DEBUG_MSG(DLVL_FAIL, "Failed to negotiate for incoming track %d, already at maximum number of tracks", tid); + return; + } + + char * tmp = playerConn.getData(); + if (!tmp){ + DEBUG_MSG(DLVL_FAIL, "Failed to negotiate for incoming track %d, there does not seem to be a connection with the buffer", tid); + return; + } + int bufConnOffset = trackMap.size(); + DEBUG_MSG(DLVL_DEVEL, "Starting negotiation for incoming track %d, at offset %d", tid, bufConnOffset); + memset(tmp + 6 * bufConnOffset, 0, 6); + tmp[6 * bufConnOffset] = 0x80; + tmp[6 * bufConnOffset + 4] = 0xFF; + tmp[6 * bufConnOffset + 5] = 0xFF; + playerConn.keepAlive(); + int newTid = 0x80000000; + while (newTid == 0x80000000){ + Util::sleep(100); + newTid = ((long)(tmp[6 * bufConnOffset]) << 24) | ((long)(tmp[6 * bufConnOffset + 1]) << 16) | ((long)(tmp[6 * bufConnOffset + 2]) << 8) | tmp[6 * bufConnOffset + 3]; + } + DEBUG_MSG(DLVL_DEVEL, "Track %d temporarily mapped to %d", tid, newTid); + + char pageName[100]; + sprintf(pageName, "liveStream_%s%d", streamName.c_str(), newTid); + metaPages[newTid].init(pageName, 8 * 1024 * 1024); + DTSC::Meta tmpMeta = meta_out; + tmpMeta.tracks.clear(); + tmpMeta.tracks[newTid] = meta_out.tracks[tid]; + tmpMeta.tracks[newTid].trackID = newTid; + JSON::Value tmpVal = tmpMeta.toJSON(); + std::string tmpStr = tmpVal.toNetPacked(); + memcpy(metaPages[newTid].mapped, tmpStr.data(), tmpStr.size()); + DEBUG_MSG(DLVL_DEVEL, "Temporary metadata written for incoming track %d, handling as track %d", tid, newTid); + + unsigned short firstPage = 0xFFFF; + unsigned int finalTid = newTid; + while (firstPage == 0xFFFF){ + DEBUG_MSG(DLVL_DEVEL, "Re-checking at offset %d", bufConnOffset); + Util::sleep(100); + finalTid = ((long)(tmp[6 * bufConnOffset]) << 24) | ((long)(tmp[6 * bufConnOffset + 1]) << 16) | ((long)(tmp[6 * bufConnOffset + 2]) << 8) | tmp[6 * bufConnOffset + 3]; + firstPage = ((long)(tmp[6 * bufConnOffset + 4]) << 8) | tmp[6 * bufConnOffset + 5]; + if (finalTid == 0xFFFFFFFF){ + DEBUG_MSG(DLVL_DEVEL, "Buffer has declined incoming track %d", tid); + return; + } + } + //Reinitialize so we make sure we got the right values here + finalTid = ((long)(tmp[6 * bufConnOffset]) << 24) | ((long)(tmp[6 * bufConnOffset + 1]) << 16) | ((long)(tmp[6 * bufConnOffset + 2]) << 8) | tmp[6 * bufConnOffset + 3]; + firstPage = ((long)(tmp[6 * bufConnOffset + 4]) << 8) | tmp[6 * bufConnOffset + 5]; + if (finalTid == 0xFFFFFFFF){ + DEBUG_MSG(DLVL_DEVEL, "Buffer has declined incoming track %d", tid); + memset(tmp + 6 * bufConnOffset, 0, 6); + return; + } + + DEBUG_MSG(DLVL_DEVEL, "Buffer accepted incoming track %d, temporary mapping %d as final mapping %d", tid, newTid, finalTid); + DEBUG_MSG(DLVL_DEVEL, "Buffer has indicated that incoming track %d should start writing on track %d, page %d", tid, finalTid, firstPage); + memset(pageName, 0, 100); + sprintf(pageName, "%s%d_%d", streamName.c_str(), finalTid, firstPage); + curPages[finalTid].init(pageName, 8 * 1024 * 1024); + trackMap[tid] = finalTid; + bookKeeping[finalTid] = DTSCPageData(); + DEBUG_MSG(DLVL_DEVEL, "Done negotiating for incoming track %d", tid); + } + + + void Output::negotiatePushTracks() { + int i = 0; + for (std::map::iterator it = meta_out.tracks.begin(); it != meta_out.tracks.end() && i < 5; it++){ + negotiateWithBuffer(it->first); + i++; + } + } + + void Output::bufferPacket(JSON::Value & pack){ + if (myMeta.tracks[pack["trackid"].asInt()].type != "video"){ + if ((pack["time"].asInt() - bookKeeping[trackMap[pack["trackid"].asInt()]].lastKeyTime) >= 5000){ + pack["keyframe"] = 1LL; + bookKeeping[trackMap[pack["trackid"].asInt()]].lastKeyTime = pack["time"].asInt(); + } + } + if (pack["trackid"].asInt() == 0){ + return; + } + //Re-negotiate declined tracks on each keyframe, to compensate for "broken" tracks + if (!trackMap.count(pack["trackid"].asInt()) || !trackMap[pack["trackid"].asInt()]){ + if (pack.isMember("keyframe") && pack["keyframe"]){ + negotiateWithBuffer(pack["trackid"].asInt()); + } + } + if (!trackMap.count(pack["trackid"].asInt()) || !trackMap[pack["trackid"].asInt()]){ + //declined track; + return; + } + pack["trackid"] = trackMap[pack["trackid"].asInt()]; + long long unsigned int tNum = pack["trackid"].asInt(); + if (!bookKeeping.count(tNum)){ + return; + } + int pageNum = bookKeeping[tNum].pageNum; + std::string tmp = pack.toNetPacked(); + if (bookKeeping[tNum].curOffset > 8388608 && pack.isMember("keyframe") && pack["keyframe"]){ + Util::sleep(500); + //open new page + char nextPage[100]; + sprintf(nextPage, "%s%llu_%d", streamName.c_str(), tNum, bookKeeping[tNum].pageNum + bookKeeping[tNum].keyNum); + curPages[tNum].init(nextPage, 26 * 1024 * 1024); + bookKeeping[tNum].pageNum += bookKeeping[tNum].keyNum; + bookKeeping[tNum].keyNum = 0; + bookKeeping[tNum].curOffset = 0; + } + if (bookKeeping[tNum].curOffset + tmp.size() < curPages[tNum].len){ + bookKeeping[tNum].keyNum += (pack.isMember("keyframe") && pack["keyframe"]); + memcpy(curPages[tNum].mapped + bookKeeping[tNum].curOffset, tmp.data(), tmp.size()); + bookKeeping[tNum].curOffset += tmp.size(); + }else{ + bookKeeping[tNum].curOffset += tmp.size(); + DEBUG_MSG(DLVL_WARN, "Can't buffer frame on page %d, track %llu, time %lld, keyNum %d, offset %llu", pageNum, tNum, pack["time"].asInt(), bookKeeping[tNum].pageNum + bookKeeping[tNum].keyNum, bookKeeping[tNum].curOffset); + } + playerConn.keepAlive(); + } + + void Output::initialize(){ if (isInitialized){ @@ -392,6 +528,7 @@ namespace Mist { } } DEBUG_MSG(DLVL_MEDIUM, "MistOut client handler shutting down: %s, %s, %s", myConn.connected() ? "conn_active" : "conn_closed", wantRequest ? "want_request" : "no_want_request", parseData ? "parsing_data" : "not_parsing_data"); + playerConn.finish(); myConn.close(); return 0; } diff --git a/src/output/output.h b/src/output/output.h index 7208b65f..e2f0f45f 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -25,7 +25,18 @@ namespace Mist { unsigned int offset; }; - /// The output class is intended to be inherited by MistOut process classes. + struct DTSCPageData { + DTSCPageData() : pageNum(0), keyNum(0), partNum(0), dataSize(0), curOffset(0), firstTime(0), lastKeyTime(-5000){} + int pageNum;/// preBuf; + std::map trackMap; + std::map metaPages; + std::map bookKeeping; }; } diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index d2c8c9da..9b3c88ce 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -587,147 +587,6 @@ namespace Mist { #endif } //parseAMFCommand - void OutRTMP::bufferPacket(JSON::Value & pack){ - if (!trackMap.count(pack["trackid"].asInt())){ - //declined track; - return; - } - if (myMeta.tracks[pack["trackid"].asInt()].type != "video"){ - if ((pack["time"].asInt() - bookKeeping[trackMap[pack["trackid"].asInt()]].lastKeyTime) >= 5000){ - pack["keyframe"] = 1LL; - bookKeeping[trackMap[pack["trackid"].asInt()]].lastKeyTime = pack["time"].asInt(); - } - } - pack["trackid"] = trackMap[pack["trackid"].asInt()]; - long long unsigned int tNum = pack["trackid"].asInt(); - if (!bookKeeping.count(tNum)){ - return; - } - int pageNum = bookKeeping[tNum].pageNum; - std::string tmp = pack.toNetPacked(); - if (bookKeeping[tNum].curOffset > 8388608 && pack.isMember("keyframe") && pack["keyframe"]){ - Util::sleep(500); - //open new page - char nextPage[100]; - sprintf(nextPage, "%s%llu_%d", streamName.c_str(), tNum, bookKeeping[tNum].pageNum + bookKeeping[tNum].keyNum); - curPages[tNum].init(nextPage, 26 * 1024 * 1024); - bookKeeping[tNum].pageNum += bookKeeping[tNum].keyNum; - bookKeeping[tNum].keyNum = 0; - bookKeeping[tNum].curOffset = 0; - } - if (bookKeeping[tNum].curOffset + tmp.size() < curPages[tNum].len){ - bookKeeping[tNum].keyNum += (pack.isMember("keyframe") && pack["keyframe"]); - memcpy(curPages[tNum].mapped + bookKeeping[tNum].curOffset, tmp.data(), tmp.size()); - bookKeeping[tNum].curOffset += tmp.size(); - }else{ - bookKeeping[tNum].curOffset += tmp.size(); - DEBUG_MSG(DLVL_WARN, "Can't buffer frame on page %d, track %llu, time %lld, keyNum %d, offset %llu", pageNum, tNum, pack["time"].asInt(), bookKeeping[tNum].pageNum + bookKeeping[tNum].keyNum, bookKeeping[tNum].curOffset); - ///\todo Open next page plx - } - playerConn.keepAlive(); - } - - - void OutRTMP::negotiatePushTracks() { - char * tmp = playerConn.getData(); - if (!tmp){ - DEBUG_MSG(DLVL_FAIL, "No userpage allocated"); - return; - } - memset(tmp, 0, 30); - unsigned int i = 0; - for (std::map::iterator it = meta_out.tracks.begin(); it != meta_out.tracks.end() && i < 5; it++){ - DEBUG_MSG(DLVL_DEVEL, "Negotiating tracknum for id %d", it->first); - (tmp + 6 * i)[0] = 0x80; - (tmp + 6 * i)[1] = 0x00; - (tmp + 6 * i)[2] = 0x00; - (tmp + 6 * i)[3] = 0x00; - (tmp + 6 * i)[4] = (it->first >> 8) & 0xFF; - (tmp + 6 * i)[5] = (it->first) & 0xFF; - i++; - } - playerConn.keepAlive(); - bool gotAllNumbers = false; - while (!gotAllNumbers){ - Util::sleep(100); - gotAllNumbers = true; - i = 0; - for (std::map::iterator it = meta_out.tracks.begin(); it != meta_out.tracks.end() && i < 5; it++){ - unsigned long tNum = (((long)(tmp + (6 * i))[0]) << 24) | (((long)(tmp + (6 * i))[1]) << 16) | (((long)(tmp + (6 * i))[2]) << 8) | (long)(tmp + (6 * i))[3]; - unsigned short oldNum = (((long)(tmp + (6 * i))[4]) << 8) | (long)(tmp + (6 * i))[5]; - if( tNum & 0x80000000){ - gotAllNumbers = false; - break; - }else{ - DEBUG_MSG(DLVL_DEVEL, "Mapped %d -> %lu", oldNum, tNum); - trackMap[oldNum] = tNum; - } - i++; - } - } - for (std::map::iterator it = trackMap.begin(); it != trackMap.end(); it++){ - char tmp[100]; - sprintf( tmp, "liveStream_%s%d", streamName.c_str(), it->second); - metaPages[it->second].init(std::string(tmp), 8 * 1024 * 1024); - DTSC::Meta tmpMeta = meta_out; - tmpMeta.tracks.clear(); - tmpMeta.tracks[it->second] = meta_out.tracks[it->first]; - tmpMeta.tracks[it->second].trackID = it->second; - JSON::Value tmpVal = tmpMeta.toJSON(); - std::string tmpStr = tmpVal.toNetPacked(); - memcpy(metaPages[it->second].mapped, tmpStr.data(), tmpStr.size()); - DEBUG_MSG(DLVL_DEVEL, "Written meta for track %d", it->second); - } - gotAllNumbers = false; - while (!gotAllNumbers){ - Util::sleep(100); - gotAllNumbers = true; - i = 0; - unsigned int j = 0; - //update Metadata; - JSON::Value jsonMeta; - JSON::fromDTMI((const unsigned char*)streamIndex.mapped + 8, streamIndex.len - 8, j, jsonMeta); - myMeta = DTSC::Meta(jsonMeta); - tmp = playerConn.getData(); - for (std::map::iterator it = meta_out.tracks.begin(); it != meta_out.tracks.end() && i < 5; it++){ - unsigned long tNum = (((long)(tmp + (6 * i))[0]) << 24) | (((long)(tmp + (6 * i))[1]) << 16) | (((long)(tmp + (6 * i))[2]) << 8) | (long)(tmp + (6 * i))[3]; - if( tNum == 0xFFFFFFFF){ - DEBUG_MSG(DLVL_DEVEL, "Skipping a declined track"); - i++; - continue; - } - if(!myMeta.tracks.count(tNum)){ - gotAllNumbers = false; - break; - } - i++; - } - } - i = 0; - tmp = playerConn.getData(); - for (std::map::iterator it = meta_out.tracks.begin(); it != meta_out.tracks.end() && i < 5; it++){ - unsigned long tNum = ((long)(tmp[6*i]) << 24) | ((long)(tmp[6 * i + 1]) << 16) | ((long)(tmp[6 * i + 2]) << 8) | tmp[6 * i + 3]; - if( tNum == 0xFFFFFFFF){ - tNum = ((long)(tmp[6 * i + 4]) << 8) | (long)tmp[6 * i + 5]; - DEBUG_MSG(DLVL_WARN, "Buffer declined track %i", trackMap[tNum]); - trackMap.erase(tNum); - tmp[6*i] = 0; - tmp[6*i+1] = 0; - tmp[6*i+2] = 0; - tmp[6*i+3] = 0; - tmp[6*i+4] = 0; - tmp[6*i+5] = 0; - }else{ - char firstPage[100]; - sprintf(firstPage, "%s%lu_%d", streamName.c_str(), tNum, 0); - curPages[tNum].init(firstPage, 8 * 1024 * 1024); - bookKeeping[tNum] = DTSCPageData(); - DEBUG_MSG(DLVL_WARN, "Buffer accepted track %lu", tNum); - } - i++; - } - } - ///\brief Gets and parses one RTMP chunk at a time. ///\param inputBuffer A buffer filled with chunk data. void OutRTMP::parseChunk(Socket::Buffer & inputBuffer) { diff --git a/src/output/output_rtmp.h b/src/output/output_rtmp.h index 1e4df593..b262753d 100644 --- a/src/output/output_rtmp.h +++ b/src/output/output_rtmp.h @@ -5,18 +5,7 @@ namespace Mist { - struct DTSCPageData { - DTSCPageData() : pageNum(0), keyNum(0), partNum(0), dataSize(0), curOffset(0), firstTime(0), lastKeyTime(-5000){} - int pageNum;/// preBuf; - std::map trackMap; - std::map metaPages; - std::map bookKeeping; }; }