Optimization that allows Socket::Buffer to write directly to a Util::ResizeablePointer
This commit is contained in:
parent
364441c435
commit
4df771eb02
7 changed files with 55 additions and 25 deletions
15
lib/dtsc.cpp
15
lib/dtsc.cpp
|
@ -130,21 +130,24 @@ namespace DTSC{
|
||||||
void Packet::reInit(Socket::Connection &src){
|
void Packet::reInit(Socket::Connection &src){
|
||||||
int sleepCount = 0;
|
int sleepCount = 0;
|
||||||
null();
|
null();
|
||||||
int toReceive = 0;
|
Util::ResizeablePointer ptr;
|
||||||
while (src.connected()){
|
while (src.connected()){
|
||||||
if (!toReceive && src.Received().available(8)){
|
if (!ptr.rsize() && src.Received().available(8)){
|
||||||
if (src.Received().copy(2) != "DT"){
|
if (src.Received().copy(2) != "DT"){
|
||||||
WARN_MSG("Invalid DTSC Packet header encountered (%s)",
|
WARN_MSG("Invalid DTSC Packet header encountered (%s)",
|
||||||
Encodings::Hex::encode(src.Received().copy(4)).c_str());
|
Encodings::Hex::encode(src.Received().copy(4)).c_str());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
toReceive = Bit::btohl(src.Received().copy(8).data() + 4);
|
ptr.allocate(Bit::btohl(src.Received().copy(8).data() + 4) + 8);
|
||||||
}
|
}
|
||||||
if (toReceive && src.Received().available(toReceive + 8)){
|
unsigned int readable = src.Received().bytes(ptr.rsize() - ptr.size());
|
||||||
std::string dataBuf = src.Received().remove(toReceive + 8);
|
if (ptr.rsize() && readable){
|
||||||
reInit(dataBuf.data(), dataBuf.size());
|
src.Received().remove(ptr, readable);
|
||||||
|
if (ptr.size() == ptr.rsize()){
|
||||||
|
reInit(ptr, ptr.size());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if (!src.spool()){
|
if (!src.spool()){
|
||||||
if (sleepCount++ > 750){
|
if (sleepCount++ > 750){
|
||||||
WARN_MSG("Waiting for packet on connection timed out");
|
WARN_MSG("Waiting for packet on connection timed out");
|
||||||
|
|
|
@ -473,6 +473,25 @@ std::string Socket::Buffer::remove(unsigned int count){
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Removes count bytes from the buffer, appending them to the given ptr.
|
||||||
|
/// Does nothing if not all count bytes are available.
|
||||||
|
void Socket::Buffer::remove(Util::ResizeablePointer & ptr, unsigned int count){
|
||||||
|
size();
|
||||||
|
if (!available(count)){return;}
|
||||||
|
unsigned int i = 0;
|
||||||
|
for (std::deque<std::string>::reverse_iterator it = data.rbegin(); it != data.rend(); ++it){
|
||||||
|
if (i + (*it).size() < count){
|
||||||
|
ptr.append(*it);
|
||||||
|
i += (*it).size();
|
||||||
|
(*it).clear();
|
||||||
|
}else{
|
||||||
|
ptr.append(it->data(), count - i);
|
||||||
|
(*it).erase(0, count - i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Copies count bytes from the buffer, returning them by value.
|
/// Copies count bytes from the buffer, returning them by value.
|
||||||
/// Returns an empty string if not all count bytes are available.
|
/// Returns an empty string if not all count bytes are available.
|
||||||
std::string Socket::Buffer::copy(unsigned int count){
|
std::string Socket::Buffer::copy(unsigned int count){
|
||||||
|
|
|
@ -26,6 +26,8 @@
|
||||||
#include "mbedtls/ssl.h"
|
#include "mbedtls/ssl.h"
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include "util.h"
|
||||||
|
|
||||||
// for being friendly with Socket::Connection down below
|
// for being friendly with Socket::Connection down below
|
||||||
namespace Buffer{
|
namespace Buffer{
|
||||||
class user;
|
class user;
|
||||||
|
@ -68,6 +70,7 @@ namespace Socket{
|
||||||
bool available(unsigned int count);
|
bool available(unsigned int count);
|
||||||
bool available(unsigned int count) const;
|
bool available(unsigned int count) const;
|
||||||
std::string remove(unsigned int count);
|
std::string remove(unsigned int count);
|
||||||
|
void remove(Util::ResizeablePointer & ptr, unsigned int count);
|
||||||
std::string copy(unsigned int count);
|
std::string copy(unsigned int count);
|
||||||
void clear();
|
void clear();
|
||||||
};
|
};
|
||||||
|
|
|
@ -338,8 +338,9 @@ namespace HTTP{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
std::string buf = downer.getSocket().Received().remove(s);
|
Util::ResizeablePointer buf;
|
||||||
cb.dataCallback(buf.data(), s);
|
downer.getSocket().Received().remove(buf, s);
|
||||||
|
cb.dataCallback(buf, s);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -156,19 +156,17 @@ namespace HTTP{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
C.Received().remove(headSize); // delete the header
|
C.Received().remove(headSize); // delete the header
|
||||||
std::string pl = C.Received().remove(payLen);
|
|
||||||
if (masked){
|
|
||||||
// If masked, apply the mask to the payload
|
|
||||||
const char *mask = head.data() + headSize - 4; // mask is last 4 bytes of header
|
|
||||||
for (uint32_t i = 0; i < payLen; ++i){pl[i] ^= mask[i % 4];}
|
|
||||||
}
|
|
||||||
if ((head[0] & 0xF)){
|
if ((head[0] & 0xF)){
|
||||||
// Non-continuation
|
// Non-continuation
|
||||||
frameType = (head[0] & 0xF);
|
frameType = (head[0] & 0xF);
|
||||||
data.assign(pl.data(), pl.size());
|
data.truncate(0);
|
||||||
}else{
|
}
|
||||||
// Continuation
|
size_t preSize = data.size();
|
||||||
data.append(pl.data(), pl.size());
|
C.Received().remove(data, payLen);
|
||||||
|
if (masked){
|
||||||
|
// If masked, apply the mask to the payload
|
||||||
|
const char *mask = head.data() + headSize - 4; // mask is last 4 bytes of header
|
||||||
|
for (uint32_t i = 0; i < payLen; ++i){data[i+preSize] ^= mask[i % 4];}
|
||||||
}
|
}
|
||||||
if (head[0] & 0x80){
|
if (head[0] & 0x80){
|
||||||
// FIN
|
// FIN
|
||||||
|
|
|
@ -30,11 +30,15 @@ AnalyserDTSC::AnalyserDTSC(Util::Config &conf) : Analyser(conf){
|
||||||
|
|
||||||
bool AnalyserDTSC::parsePacket(){
|
bool AnalyserDTSC::parsePacket(){
|
||||||
if (headLess){
|
if (headLess){
|
||||||
|
Util::ResizeablePointer dataBuf;
|
||||||
while (conn){
|
while (conn){
|
||||||
if (!conn.spool()){Util::sleep(50);}
|
if (conn.spool()){
|
||||||
|
conn.Received().remove(dataBuf, conn.Received().bytes(0xFFFFFFFFul));
|
||||||
|
}else{
|
||||||
|
Util::sleep(50);
|
||||||
}
|
}
|
||||||
std::string dataBuf = conn.Received().remove(conn.Received().bytes(0xFFFFFFFFul));
|
}
|
||||||
DTSC::Scan S((char *)dataBuf.data(), dataBuf.size());
|
DTSC::Scan S(dataBuf, dataBuf.size());
|
||||||
std::cout << S.toPrettyString() << std::endl;
|
std::cout << S.toPrettyString() << std::endl;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -534,6 +534,7 @@ namespace Mist{
|
||||||
bool hasStarted = false;
|
bool hasStarted = false;
|
||||||
cfgPointer = config;
|
cfgPointer = config;
|
||||||
globalStreamName = streamName;
|
globalStreamName = streamName;
|
||||||
|
Util::ResizeablePointer newData;
|
||||||
unsigned long long threadCheckTimer = Util::bootSecs();
|
unsigned long long threadCheckTimer = Util::bootSecs();
|
||||||
while (config->is_active){
|
while (config->is_active){
|
||||||
if (tcpCon){
|
if (tcpCon){
|
||||||
|
@ -543,10 +544,11 @@ namespace Mist{
|
||||||
tcpCon.Received().remove(1);
|
tcpCon.Received().remove(1);
|
||||||
}
|
}
|
||||||
if (tcpCon.Received().available(188) && tcpCon.Received().get()[0] == 0x47){
|
if (tcpCon.Received().available(188) && tcpCon.Received().get()[0] == 0x47){
|
||||||
std::string newData = tcpCon.Received().remove(188);
|
newData.truncate(0);
|
||||||
|
tcpCon.Received().remove(newData, 188);
|
||||||
if (rawMode){
|
if (rawMode){
|
||||||
keepAlive();
|
keepAlive();
|
||||||
rawBuffer.append(newData);
|
rawBuffer.append(newData, newData.size());
|
||||||
if (rawBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){
|
if (rawBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){
|
||||||
if (rawIdx == INVALID_TRACK_ID){
|
if (rawIdx == INVALID_TRACK_ID){
|
||||||
rawIdx = meta.addTrack();
|
rawIdx = meta.addTrack();
|
||||||
|
@ -562,7 +564,7 @@ namespace Mist{
|
||||||
rawBuffer.truncate(0);
|
rawBuffer.truncate(0);
|
||||||
}
|
}
|
||||||
}else {
|
}else {
|
||||||
tsBuf.FromPointer(newData.data());
|
tsBuf.FromPointer(newData);
|
||||||
liveStream.add(tsBuf);
|
liveStream.add(tsBuf);
|
||||||
if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());}
|
if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue