Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ BITCOIN_CORE_H = \
net.h \
netaddress.h \
netbase.h \
netmessagemaker.h \
noui.h \
policy/fees.h \
policy/policy.h \
Expand Down
201 changes: 108 additions & 93 deletions src/main.cpp

Large diffs are not rendered by default.

46 changes: 19 additions & 27 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -768,13 +768,13 @@ const uint256& CNetMessage::GetMessageHash() const
// requires LOCK(cs_vSend)
size_t SocketSendData(CNode *pnode)
{
std::deque<CSerializeData>::iterator it = pnode->vSendMsg.begin();
auto it = pnode->vSendMsg.begin();
size_t nSentSize = 0;

while (it != pnode->vSendMsg.end()) {
const CSerializeData &data = *it;
const auto &data = *it;
assert(data.size() > pnode->nSendOffset);
int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
int nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
if (nBytes > 0) {
pnode->nLastSend = GetTime();
pnode->nSendBytes += nBytes;
Expand Down Expand Up @@ -2612,30 +2612,19 @@ void CNode::AskFor(const CInv& inv)
mapAskFor.insert(std::make_pair(nRequestTime, inv));
}

CDataStream CConnman::BeginMessage(CNode* pnode, int nVersion, int flags, const std::string& sCommand)
void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
{
return {SER_NETWORK, (nVersion ? nVersion : pnode->GetSendVersion()) | flags, CMessageHeader(Params().MessageStart(), sCommand.c_str(), 0) };
}

void CConnman::EndMessage(CDataStream& strm)
{
// Set the size
assert(strm.size () >= CMessageHeader::HEADER_SIZE);
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
WriteLE32((uint8_t*)&strm[CMessageHeader::MESSAGE_SIZE_OFFSET], nSize);
// Set the checksum
uint256 hash = Hash(strm.begin() + CMessageHeader::HEADER_SIZE, strm.end());
memcpy((char*)&strm[CMessageHeader::CHECKSUM_OFFSET], hash.begin(), CMessageHeader::CHECKSUM_SIZE);

}
size_t nMessageSize = msg.data.size();
size_t nTotalSize = nMessageSize + CMessageHeader::HEADER_SIZE;
LogPrint("net", "sending %s (%d bytes) peer=%d\n", SanitizeString(msg.command.c_str()), nMessageSize, pnode->id);

void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand)
{
if(strm.empty())
return;
std::vector<unsigned char> serializedHeader;
serializedHeader.reserve(CMessageHeader::HEADER_SIZE);
uint256 hash = Hash(msg.data.data(), msg.data.data() + nMessageSize);
CMessageHeader hdr(Params().MessageStart(), msg.command.c_str(), nMessageSize);
memcpy(hdr.pchChecksum, hash.begin(), CMessageHeader::CHECKSUM_SIZE);

unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
LogPrint("net", "sending %s (%d bytes) peer=%d\n", SanitizeString(sCommand.c_str()), nSize, pnode->id);
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, serializedHeader, 0, hdr};

size_t nBytesSent = 0;
{
Expand All @@ -2644,11 +2633,14 @@ void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& s
return;
}
bool optimisticSend(pnode->vSendMsg.empty());
pnode->vSendMsg.emplace_back(strm.begin(), strm.end());

//log total amount of bytes per command
pnode->mapSendBytesPerMsgCmd[sCommand] += strm.size();
pnode->nSendSize += strm.size();
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
pnode->nSendSize += nTotalSize;

pnode->vSendMsg.push_back(std::move(serializedHeader));
if (nMessageSize)
pnode->vSendMsg.push_back(std::move(msg.data));

// If write queue empty, attempt "optimistic write"
if (optimisticSend == true)
Expand Down
49 changes: 17 additions & 32 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ class CTransaction;
class CNodeStats;
class CClientUIInterface;

struct CSerializedNetMsg
{
CSerializedNetMsg() = default;
CSerializedNetMsg(CSerializedNetMsg&&) = default;
CSerializedNetMsg& operator=(CSerializedNetMsg&&) = default;
// No copying, only moves.
CSerializedNetMsg(const CSerializedNetMsg& msg) = delete;
CSerializedNetMsg& operator=(const CSerializedNetMsg&) = delete;

std::vector<unsigned char> data;
std::string command;
};


class CConnman
{
public:
Expand Down Expand Up @@ -138,32 +152,7 @@ class CConnman

bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func);

template <typename... Args>
void PushMessageWithVersionAndFlag(CNode* pnode, int nVersion, int flag, const std::string& sCommand, Args&&... args)
{
auto msg(BeginMessage(pnode, nVersion, flag, sCommand));
::SerializeMany(msg, std::forward<Args>(args)...);
EndMessage(msg);
PushMessage(pnode, msg, sCommand);
}

template <typename... Args>
void PushMessageWithFlag(CNode* pnode, int flag, const std::string& sCommand, Args&&... args)
{
PushMessageWithVersionAndFlag(pnode, 0, flag, sCommand, std::forward<Args>(args)...);
}

template <typename... Args>
void PushMessageWithVersion(CNode* pnode, int nVersion, const std::string& sCommand, Args&&... args)
{
PushMessageWithVersionAndFlag(pnode, nVersion, 0, sCommand, std::forward<Args>(args)...);
}

template <typename... Args>
void PushMessage(CNode* pnode, const std::string& sCommand, Args&&... args)
{
PushMessageWithVersionAndFlag(pnode, 0, 0, sCommand, std::forward<Args>(args)...);
}
void PushMessage(CNode* pnode, CSerializedNetMsg&& msg);

template<typename Callable>
bool ForEachNodeContinueIf(Callable&& func)
Expand Down Expand Up @@ -374,10 +363,6 @@ class CConnman

unsigned int GetReceiveFloodSize() const;

CDataStream BeginMessage(CNode* node, int nVersion, int flags, const std::string& sCommand);
void PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand);
void EndMessage(CDataStream& strm);

// Network stats
void RecordBytesRecv(uint64_t bytes);
void RecordBytesSent(uint64_t bytes);
Expand Down Expand Up @@ -601,7 +586,7 @@ class CNode
size_t nSendSize; // total size of all vSendMsg entries
size_t nSendOffset; // offset inside the first vSendMsg already sent
uint64_t nSendBytes;
std::deque<CSerializeData> vSendMsg;
std::deque<std::vector<unsigned char>> vSendMsg;
CCriticalSection cs_vSend;

std::deque<CInv> vRecvGetData;
Expand Down Expand Up @@ -771,7 +756,7 @@ class CNode
{
// The send version should always be explicitly set to
// INIT_PROTO_VERSION rather than using this value until the handshake
// is complete. See PushMessageWithVersion().
// is complete.
assert(nSendVersion != 0);
return nSendVersion;
}
Expand Down
36 changes: 36 additions & 0 deletions src/netmessagemaker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2009-2010 Satoshi Nakamoto
// Copyright (c) 2009-2016 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

#ifndef BITCOIN_NETMESSAGEMAKER_H
#define BITCOIN_NETMESSAGEMAKER_H

#include "net.h"
#include "serialize.h"

class CNetMsgMaker
{
public:
CNetMsgMaker(int nVersionIn) : nVersion(nVersionIn){}

template <typename... Args>
CSerializedNetMsg Make(int nFlags, std::string sCommand, Args&&... args)
{
CSerializedNetMsg msg;
msg.command = std::move(sCommand);
CVectorWriter{ SER_NETWORK, nFlags | nVersion, msg.data, 0, std::forward<Args>(args)... };
return msg;
}

template <typename... Args>
CSerializedNetMsg Make(std::string sCommand, Args&&... args)
{
return Make(0, std::move(sCommand), std::forward<Args>(args)...);
}

private:
const int nVersion;
};

#endif // BITCOIN_NETMESSAGEMAKER_H
69 changes: 69 additions & 0 deletions src/streams.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,75 @@ OverrideStream<S> WithOrVersion(S* s, int nVersionFlag)
return OverrideStream<S>(s, s->GetType(), s->GetVersion() | nVersionFlag);
}

/* Minimal stream for overwriting and/or appending to an existing byte vector
*
* The referenced vector will grow as necessary
*/
class CVectorWriter
{
public:

/*
* @param[in] nTypeIn Serialization Type
* @param[in] nVersionIn Serialization Version (including any flags)
* @param[in] vchDataIn Referenced byte vector to overwrite/append
* @param[in] nPosIn Starting position. Vector index where writes should start. The vector will initially
* grow as necessary to max(index, vec.size()). So to append, use vec.size().
*/
CVectorWriter(int nTypeIn, int nVersionIn, std::vector<unsigned char>& vchDataIn, size_t nPosIn) : nType(nTypeIn), nVersion(nVersionIn), vchData(vchDataIn), nPos(nPosIn)
{
if(nPos > vchData.size())
vchData.resize(nPos);
}
/*
* (other params same as above)
* @param[in] args A list of items to serialize starting at nPos.
*/
template <typename... Args>
CVectorWriter(int nTypeIn, int nVersionIn, std::vector<unsigned char>& vchDataIn, size_t nPosIn, Args&&... args) : CVectorWriter(nTypeIn, nVersionIn, vchDataIn, nPosIn)
{
::SerializeMany(*this, std::forward<Args>(args)...);
}
void write(const char* pch, size_t nSize)
{
assert(nPos <= vchData.size());
size_t nOverwrite = std::min(nSize, vchData.size() - nPos);
if (nOverwrite) {
memcpy(vchData.data() + nPos, reinterpret_cast<const unsigned char*>(pch), nOverwrite);
}
if (nOverwrite < nSize) {
vchData.insert(vchData.end(), reinterpret_cast<const unsigned char*>(pch) + nOverwrite, reinterpret_cast<const unsigned char*>(pch) + nSize);
}
nPos += nSize;
}
template<typename T>
CVectorWriter& operator<<(const T& obj)
{
// Serialize to this stream
::Serialize(*this, obj);
return (*this);
}
int GetVersion() const
{
return nVersion;
}
int GetType() const
{
return nType;
}
void seek(size_t nSize)
{
nPos += nSize;
if(nPos > vchData.size())
vchData.resize(nPos);
}
private:
const int nType;
const int nVersion;
std::vector<unsigned char>& vchData;
size_t nPos;
};

/** Double ended buffer combining vector and stream-like interfaces.
*
* >> and << read and write unformatted data using the above serialization templates.
Expand Down
58 changes: 58 additions & 0 deletions src/test/streams_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,64 @@ using namespace boost::assign; // bring 'operator+=()' into scope

BOOST_FIXTURE_TEST_SUITE(streams_tests, BasicTestingSetup)

BOOST_AUTO_TEST_CASE(streams_vector_writer)
{
unsigned char a(1);
unsigned char b(2);
unsigned char bytes[] = { 3, 4, 5, 6 };
std::vector<unsigned char> vch;

// Each test runs twice. Serializing a second time at the same starting
// point should yield the same results, even if the first test grew the
// vector.

CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 0, a, b);
BOOST_CHECK((vch == std::vector<unsigned char>{{1, 2}}));
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 0, a, b);
BOOST_CHECK((vch == std::vector<unsigned char>{{1, 2}}));
vch.clear();

CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 2, a, b);
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 1, 2}}));
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 2, a, b);
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 1, 2}}));
vch.clear();

vch.resize(5, 0);
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 2, a, b);
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 1, 2, 0}}));
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 2, a, b);
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 1, 2, 0}}));
vch.clear();

vch.resize(4, 0);
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 3, a, b);
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 0, 1, 2}}));
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 3, a, b);
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 0, 1, 2}}));
vch.clear();

vch.resize(4, 0);
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 4, a, b);
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 0, 0, 1, 2}}));
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 4, a, b);
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 0, 0, 1, 2}}));
vch.clear();

CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 0, FLATDATA(bytes));
BOOST_CHECK((vch == std::vector<unsigned char>{{3, 4, 5, 6}}));
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 0, FLATDATA(bytes));
BOOST_CHECK((vch == std::vector<unsigned char>{{3, 4, 5, 6}}));
vch.clear();

vch.resize(4, 8);
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 2, a, FLATDATA(bytes), b);
BOOST_CHECK((vch == std::vector<unsigned char>{{8, 8, 1, 3, 4, 5, 6, 2}}));
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 2, a, FLATDATA(bytes), b);
BOOST_CHECK((vch == std::vector<unsigned char>{{8, 8, 1, 3, 4, 5, 6, 2}}));
vch.clear();
}

BOOST_AUTO_TEST_CASE(streams_serializedata_xor)
{
std::vector<char> in;
Expand Down