Skip to content

Commit 8f9011d

Browse files
theuniFuzzbawls
authored andcommitted
connman is in charge of pushing messages
The changes here are dense and subtle, but hopefully all is more explicit than before. - CConnman is now in charge of sending data rather than the nodes themselves. This is necessary because many decisions need to be made with all nodes in mind, and a model that requires the nodes calling up to their manager quickly turns to spaghetti. - The per-node-serializer (ssSend) has been replaced with a (quasi-)const send-version. Since the send version for serialization can only change once per connection, we now explicitly tag messages with INIT_PROTO_VERSION if they are sent before the handshake. With this done, there's no need to lock for access to nSendVersion. Also, a new stream is used for each message, so there's no need to lock during the serialization process. - This takes care of accounting for optimistic sends, so the nOptimisticBytesWritten hack can be removed. - -dropmessagestest and -fuzzmessagestest have not been preserved, as I suspect they haven't been used in years.
1 parent f558bb7 commit 8f9011d

File tree

4 files changed

+134
-28
lines changed

4 files changed

+134
-28
lines changed

src/main.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5261,7 +5261,7 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR
52615261

52625262
// Each connection can only send one version message
52635263
if (pfrom->nVersion != 0) {
5264-
pfrom->PushMessage(NetMsgType::REJECT, strCommand, REJECT_DUPLICATE, std::string("Duplicate version message"));
5264+
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_DUPLICATE, std::string("Duplicate version message"));
52655265
LOCK(cs_main);
52665266
Misbehaving(pfrom->GetId(), 1);
52675267
return false;
@@ -5279,7 +5279,7 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR
52795279
}
52805280
if (pfrom->nServicesExpected & ~pfrom->nServices) {
52815281
LogPrint(BCLog::NET, "peer=%d does not offer the expected services (%08x offered, %08x expected); disconnecting\n", pfrom->id, pfrom->nServices, pfrom->nServicesExpected);
5282-
pfrom->PushMessage(NetMsgType::REJECT, strCommand, REJECT_NONSTANDARD,
5282+
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_NONSTANDARD,
52835283
strprintf("Expected to offer services %08x", pfrom->nServicesExpected));
52845284
pfrom->fDisconnect = true;
52855285
return false;
@@ -5332,7 +5332,7 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR
53325332

53335333
// Be shy and don't send version until we hear
53345334
if (pfrom->fInbound)
5335-
pfrom->PushVersion();
5335+
connman.PushVersion(pfrom, GetAdjustedTime());
53365336

53375337
pfrom->fClient = !(pfrom->nServices & NODE_NETWORK);
53385338

@@ -5342,8 +5342,8 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR
53425342
UpdatePreferredDownload(pfrom, State(pfrom->GetId()));
53435343
}
53445344
// Change version
5345-
pfrom->PushMessage(NetMsgType::VERACK);
5346-
pfrom->ssSend.SetVersion(std::min(pfrom->nVersion, PROTOCOL_VERSION));
5345+
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::VERACK);
5346+
pfrom->SetSendVersion(std::min(pfrom->nVersion, PROTOCOL_VERSION));
53475347

53485348
if (!pfrom->fInbound) {
53495349
// Advertise our address
@@ -6150,7 +6150,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
61506150
if (!pfrom->vRecvGetData.empty())
61516151
fMoreWork = true;
61526152
} catch (const std::ios_base::failure& e) {
6153-
pfrom->PushMessage(NetMsgType::REJECT, strCommand, REJECT_MALFORMED, std::string("error parsing message"));
6153+
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_MALFORMED, std::string("error parsing message"));
61546154
if (strstr(e.what(), "end of data")) {
61556155
// Allow exceptions from under-length message on vRecv
61566156
LogPrintf("ProcessMessages(%s, %u bytes): Exception '%s' caught, normally caused by a message being shorter than its stated length\n", SanitizeString(strCommand), nMessageSize, e.what());

src/net.cpp

Lines changed: 69 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,9 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char* pszDest, bool fCo
418418
uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
419419
CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, pszDest ? pszDest : "", false);
420420

421+
422+
PushVersion(pnode, GetTime());
423+
421424
GetNodeSignals().InitializeNode(pnode->GetId(), pnode);
422425
pnode->AddRef();
423426

@@ -439,6 +442,24 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char* pszDest, bool fCo
439442
return NULL;
440443
}
441444

445+
void CConnman::PushVersion(CNode* pnode, int64_t nTime)
446+
{
447+
ServiceFlags nLocalNodeServices = pnode->GetLocalServices();
448+
CAddress addrYou = (pnode->addr.IsRoutable() && !IsProxy(pnode->addr) ? pnode->addr : CAddress(CService(), pnode->addr.nServices));
449+
CAddress addrMe = CAddress(CService(), nLocalNodeServices);
450+
uint64_t nonce = pnode->GetLocalNonce();
451+
int nNodeStartingHeight = pnode->nMyStartingHeight;
452+
NodeId id = pnode->GetId();
453+
454+
PushMessageWithVersion(pnode, INIT_PROTO_VERSION, NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalNodeServices, nTime, addrYou, addrMe,
455+
nonce, strSubVersion, nNodeStartingHeight, true);
456+
457+
if (fLogIPs)
458+
LogPrint(BCLog::NET, "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), addrYou.ToString(), id);
459+
else
460+
LogPrint(BCLog::NET, "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), id);
461+
}
462+
442463
void CConnman::DumpBanlist()
443464
{
444465
SweepBanned(); // clean unused entries (if bantime has expired)
@@ -480,20 +501,6 @@ bool CNode::DisconnectOldProtocol(int nVersionRequired, std::string strLastComma
480501
return fDisconnect;
481502
}
482503

483-
void CNode::PushVersion()
484-
{
485-
int64_t nTime = (fInbound ? GetAdjustedTime() : GetTime());
486-
CAddress addrYou = (addr.IsRoutable() && !IsProxy(addr) ? addr : CAddress(CService(), addr.nServices));
487-
CAddress addrMe = CAddress(CService(), nLocalServices);
488-
if (fLogIPs)
489-
LogPrint(BCLog::NET, "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nMyStartingHeight, addrMe.ToString(), addrYou.ToString(), id);
490-
else
491-
LogPrint(BCLog::NET, "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nMyStartingHeight, addrMe.ToString(), id);
492-
PushMessage(NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalServices, nTime, addrYou, addrMe,
493-
nLocalHostNonce, strSubVersion, nMyStartingHeight, true);
494-
}
495-
496-
497504
void CConnman::ClearBanned()
498505
{
499506
{
@@ -2485,7 +2492,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
24852492
filterInventoryKnown(50000, 0.000001),
24862493
nLocalHostNonce(nLocalHostNonceIn),
24872494
nLocalServices(nLocalServicesIn),
2488-
nMyStartingHeight(nMyStartingHeightIn)
2495+
nMyStartingHeight(nMyStartingHeightIn),
2496+
nSendVersion(0)
24892497
{
24902498
nServices = NODE_NONE;
24912499
nServicesExpected = NODE_NONE;
@@ -2536,10 +2544,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
25362544
LogPrint(BCLog::NET, "Added connection to %s peer=%d\n", addrName, id);
25372545
else
25382546
LogPrint(BCLog::NET, "Added connection peer=%d\n", id);
2539-
2540-
// Be shy and don't send version until we hear
2541-
if (hSocket != INVALID_SOCKET && !fInbound)
2542-
PushVersion();
25432547
}
25442548

25452549
CNode::~CNode()
@@ -2642,6 +2646,52 @@ void CNode::EndMessage(const char* pszCommand) UNLOCK_FUNCTION(cs_vSend)
26422646
LEAVE_CRITICAL_SECTION(cs_vSend);
26432647
}
26442648

2649+
CDataStream CConnman::BeginMessage(CNode* pnode, int nVersion, int flags, const std::string& sCommand)
2650+
{
2651+
return {SER_NETWORK, (nVersion ? nVersion : pnode->GetSendVersion()) | flags, CMessageHeader(Params().MessageStart(), sCommand.c_str(), 0) };
2652+
}
2653+
2654+
void CConnman::EndMessage(CDataStream& strm)
2655+
{
2656+
// Set the size
2657+
assert(strm.size () >= CMessageHeader::HEADER_SIZE);
2658+
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
2659+
WriteLE32((uint8_t*)&strm[CMessageHeader::MESSAGE_SIZE_OFFSET], nSize);
2660+
// Set the checksum
2661+
uint256 hash = Hash(strm.begin() + CMessageHeader::HEADER_SIZE, strm.end());
2662+
memcpy((char*)&strm[CMessageHeader::CHECKSUM_OFFSET], hash.begin(), CMessageHeader::CHECKSUM_SIZE);
2663+
2664+
}
2665+
2666+
void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand)
2667+
{
2668+
if(strm.empty())
2669+
return;
2670+
2671+
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
2672+
LogPrint(BCLog::NET, "sending %s (%d bytes) peer=%d\n", SanitizeString(sCommand.c_str()), nSize, pnode->id);
2673+
2674+
size_t nBytesSent = 0;
2675+
{
2676+
LOCK(pnode->cs_vSend);
2677+
if(pnode->hSocket == INVALID_SOCKET) {
2678+
return;
2679+
}
2680+
bool optimisticSend(pnode->vSendMsg.empty());
2681+
pnode->vSendMsg.emplace_back(strm.begin(), strm.end());
2682+
2683+
//log total amount of bytes per command
2684+
pnode->mapSendBytesPerMsgCmd[sCommand] += strm.size();
2685+
pnode->nSendSize += strm.size();
2686+
2687+
// If write queue empty, attempt "optimistic write"
2688+
if (optimisticSend == true)
2689+
nBytesSent = SocketSendData(pnode);
2690+
}
2691+
if (nBytesSent)
2692+
RecordBytesSent(nBytesSent);
2693+
}
2694+
26452695
bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)
26462696
{
26472697
CNode* found = nullptr;

src/net.h

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,36 @@ class CConnman
140140

141141
bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func);
142142

143+
template <typename... Args>
144+
void PushMessageWithVersionAndFlag(CNode* pnode, int nVersion, int flag, const std::string& sCommand, Args&&... args)
145+
{
146+
auto msg(BeginMessage(pnode, nVersion, flag, sCommand));
147+
::SerializeMany(msg, std::forward<Args>(args)...);
148+
EndMessage(msg);
149+
PushMessage(pnode, msg, sCommand);
150+
}
151+
152+
template <typename... Args>
153+
void PushMessageWithFlag(CNode* pnode, int flag, const std::string& sCommand, Args&&... args)
154+
{
155+
PushMessageWithVersionAndFlag(pnode, 0, flag, sCommand, std::forward<Args>(args)...);
156+
}
157+
158+
template <typename... Args>
159+
void PushMessageWithVersion(CNode* pnode, int nVersion, const std::string& sCommand, Args&&... args)
160+
{
161+
PushMessageWithVersionAndFlag(pnode, nVersion, 0, sCommand, std::forward<Args>(args)...);
162+
}
163+
164+
template <typename... Args>
165+
void PushMessage(CNode* pnode, const std::string& sCommand, Args&&... args)
166+
{
167+
PushMessageWithVersionAndFlag(pnode, 0, 0, sCommand, std::forward<Args>(args)...);
168+
}
169+
170+
void PushVersion(CNode* pnode, int64_t nTime);
171+
172+
143173
template<typename Callable>
144174
bool ForEachNodeContinueIf(Callable&& func)
145175
{
@@ -332,6 +362,10 @@ class CConnman
332362
void DumpData();
333363
void DumpBanlist();
334364

365+
CDataStream BeginMessage(CNode* node, int nVersion, int flags, const std::string& sCommand);
366+
void PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand);
367+
void EndMessage(CDataStream& strm);
368+
335369
// Network stats
336370
void RecordBytesRecv(uint64_t bytes);
337371
void RecordBytesSent(uint64_t bytes);
@@ -548,6 +582,7 @@ class CNetMessage
548582
/** Information about a peer */
549583
class CNode
550584
{
585+
friend class CConnman;
551586
public:
552587
// socket
553588
ServiceFlags nServices;
@@ -660,6 +695,7 @@ class CNode
660695
// Services offered to this peer
661696
const ServiceFlags nLocalServices;
662697
const int nMyStartingHeight;
698+
int nSendVersion;
663699
public:
664700
NodeId GetId() const
665701
{
@@ -696,6 +732,25 @@ class CNode
696732
{
697733
return nRecvVersion;
698734
}
735+
void SetSendVersion(int nVersionIn)
736+
{
737+
// Send version may only be changed in the version message, and
738+
// only one version message is allowed per session. We can therefore
739+
// treat this value as const and even atomic as long as it's only used
740+
// once the handshake is complete. Any attempt to set this twice is an
741+
// error.
742+
assert(nSendVersion == 0);
743+
nSendVersion = nVersionIn;
744+
}
745+
746+
int GetSendVersion() const
747+
{
748+
// The send version should always be explicitly set to
749+
// INIT_PROTO_VERSION rather than using this value until the handshake
750+
// is complete. See PushMessageWithVersion().
751+
assert(nSendVersion != 0);
752+
return nSendVersion;
753+
}
699754

700755
CNode* AddRef()
701756
{
@@ -758,9 +813,6 @@ class CNode
758813
// TODO: Document the precondition of this function. Is cs_vSend locked?
759814
void EndMessage(const char* pszCommand) UNLOCK_FUNCTION(cs_vSend);
760815

761-
void PushVersion();
762-
763-
764816
void PushMessage(const char* pszCommand)
765817
{
766818
try {

src/test/DoS_tests.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ BOOST_AUTO_TEST_CASE(DoS_banning)
5959
connman->ClearBanned();
6060
CAddress addr1(ip(0xa0b0c001), NODE_NONE);
6161
CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 0, 0, "", true);
62+
dummyNode1.SetSendVersion(PROTOCOL_VERSION);
6263
GetNodeSignals().InitializeNode(dummyNode1.GetId(), &dummyNode1);
6364
dummyNode1.nVersion = 1;
6465
misbehave(dummyNode1.GetId(), 100); // Should get banned
@@ -68,6 +69,7 @@ BOOST_AUTO_TEST_CASE(DoS_banning)
6869

6970
CAddress addr2(ip(0xa0b0c002), NODE_NONE);
7071
CNode dummyNode2(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr2, 1, 1, "", true);
72+
dummyNode2.SetSendVersion(PROTOCOL_VERSION);
7173
GetNodeSignals().InitializeNode(dummyNode2.GetId(), &dummyNode2);
7274
dummyNode2.nVersion = 1;
7375
misbehave(dummyNode2.GetId(), 50);
@@ -87,6 +89,7 @@ BOOST_AUTO_TEST_CASE(DoS_banscore)
8789
mapArgs["-banscore"] = "111"; // because 11 is my favorite number
8890
CAddress addr1(ip(0xa0b0c001), NODE_NONE);
8991
CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 3, 1, "", true);
92+
dummyNode1.SetSendVersion(PROTOCOL_VERSION);
9093
GetNodeSignals().InitializeNode(dummyNode1.GetId(), &dummyNode1);
9194
dummyNode1.nVersion = 1;
9295
misbehave(dummyNode1.GetId(), 100);
@@ -111,6 +114,7 @@ BOOST_AUTO_TEST_CASE(DoS_bantime)
111114

112115
CAddress addr(ip(0xa0b0c001), NODE_NONE);
113116
CNode dummyNode(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr, 4, 4, "", true);
117+
dummyNode.SetSendVersion(PROTOCOL_VERSION);
114118
GetNodeSignals().InitializeNode(dummyNode.GetId(), &dummyNode);
115119
dummyNode.nVersion = 1;
116120

0 commit comments

Comments
 (0)