Skip to content

Commit c5fd91e

Browse files
committed
Rate limit the processing of rumoured addresses
Set for every instance and fix test peer, remove var Refactor debug logs Adjust p2p invalid msgs
1 parent e1e376e commit c5fd91e

File tree

8 files changed

+168
-12
lines changed

8 files changed

+168
-12
lines changed

src/net.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ static const unsigned int MAX_INV_SZ = 50000;
5353
static const unsigned int MAX_LOCATOR_SZ = 101;
5454
/** The maximum number of addresses from our addrman to return in response to a getaddr message. */
5555
static constexpr size_t MAX_ADDR_TO_SEND = 1000;
56+
/** The maximum rate of address records we're willing to process on average. */
57+
static constexpr double MAX_ADDR_RATE_PER_SECOND{0.1};
58+
/** The soft limit of the address processing token bucket (the regular MAX_ADDR_RATE_PER_SECOND
59+
* based increments won't go above this, but the MAX_ADDR_TO_SEND increment following GETADDR
60+
* is exempt from this limit. */
61+
static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND};
5662
/** Maximum length of incoming protocol messages (no message over 2 MiB is currently acceptable). */
5763
static const unsigned int MAX_PROTOCOL_MESSAGE_LENGTH = 2 * 1024 * 1024;
5864
/** Maximum length of strSubVer in `version` message */
@@ -722,6 +728,11 @@ class CNode
722728
std::set<uint256> setKnown;
723729
std::chrono::microseconds m_next_addr_send GUARDED_BY(cs_sendProcessing){0};
724730
std::chrono::microseconds m_next_local_addr_send GUARDED_BY(cs_sendProcessing){0};
731+
/** Number of addresses that can be processed from this peer. Start at 10 to
732+
* permit self-announcement and starting peer propagation */
733+
double m_addr_token_bucket{10.0};
734+
/** When m_addr_token_bucket was last updated */
735+
std::chrono::microseconds m_addr_token_timestamp{GetTime<std::chrono::microseconds>()};
725736

726737
// inventory based relay
727738
CRollingBloomFilter filterInventoryKnown;

src/net_processing.cpp

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@
2929
#include "util/validation.h"
3030
#include "validation.h"
3131

32+
#include <chrono>
33+
34+
using namespace std::chrono_literals;
35+
3236
int64_t nTimeBestReceived = 0; // Used only to inform the wallet of when we last received a block
3337

3438
static const uint64_t RANDOMIZER_ID_ADDRESS_RELAY = 0x3cac0035b5866b90ULL; // SHA256("main address relay")[0:8]
@@ -220,6 +224,10 @@ struct CNodeState {
220224
int nBlocksInFlight;
221225
//! Whether we consider this a preferred download peer.
222226
bool fPreferredDownload;
227+
//! Addresses processed
228+
uint64_t amt_addr_processed = 0;
229+
//! Addresses rate limited
230+
uint64_t amt_addr_rate_limited = 0;
223231

224232
CNodeBlocks nodeBlocks;
225233

@@ -494,6 +502,9 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats)
494502
if (queue.pindex)
495503
stats.vHeightInFlight.push_back(queue.pindex->nHeight);
496504
}
505+
506+
stats.m_addr_processed = state->amt_addr_processed;
507+
stats.m_addr_rate_limited = state->amt_addr_rate_limited;
497508
return true;
498509
}
499510

@@ -1346,6 +1357,9 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR
13461357
// Get recent addresses
13471358
connman->PushMessage(pfrom, CNetMsgMaker(nSendVersion).Make(NetMsgType::GETADDR));
13481359
pfrom->fGetAddr = true;
1360+
// When requesting a getaddr, accept an additional MAX_ADDR_TO_SEND addresses in response
1361+
// (bypassing the MAX_ADDR_PROCESSING_TOKEN_BUCKET limit).
1362+
pfrom->m_addr_token_bucket += MAX_ADDR_TO_SEND;
13491363
connman->MarkAddressGood(pfrom->addr);
13501364
}
13511365

@@ -1494,16 +1508,42 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR
14941508
std::vector<CAddress> vAddrOk;
14951509
int64_t nNow = GetAdjustedTime();
14961510
int64_t nSince = nNow - 10 * 60;
1511+
1512+
// Update/increment addr rate limiting bucket.
1513+
// TODO: Slight time improvement calculation, continue backporting
1514+
const auto current_time = GetTime<std::chrono::microseconds>();
1515+
if (pfrom->m_addr_token_bucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) {
1516+
// Don't increment bucket if it's already full
1517+
const auto time_diff = std::max(current_time - pfrom->m_addr_token_timestamp, 0us);
1518+
const double increment = CountSecondsDouble(time_diff) * MAX_ADDR_RATE_PER_SECOND;
1519+
pfrom->m_addr_token_bucket = std::min<double>(pfrom->m_addr_token_bucket + increment, MAX_ADDR_PROCESSING_TOKEN_BUCKET);
1520+
}
1521+
pfrom->m_addr_token_timestamp = current_time;
1522+
1523+
uint64_t num_proc = 0;
1524+
uint64_t num_rate_limit = 0;
1525+
Shuffle(vAddr.begin(), vAddr.end(), FastRandomContext());
1526+
14971527
for (CAddress& addr : vAddr) {
14981528
if (interruptMsgProc)
14991529
return true;
15001530

1531+
// Apply rate limiting.
1532+
if (pfrom->m_addr_token_bucket < 1.0) {
1533+
++num_rate_limit;
1534+
continue;
1535+
} else {
1536+
pfrom->m_addr_token_bucket -= 1.0;
1537+
}
1538+
15011539
if ((addr.nServices & REQUIRED_SERVICES) != REQUIRED_SERVICES)
15021540
continue;
15031541

15041542
if (addr.nTime <= 100000000 || addr.nTime > nNow + 10 * 60)
15051543
addr.nTime = nNow - 5 * 24 * 60 * 60;
15061544
pfrom->AddAddressKnown(addr);
1545+
if (connman->IsBanned(addr)) continue; // Do not process banned addresses beyond remembering we received them
1546+
++num_proc;
15071547
bool fReachable = IsReachable(addr);
15081548
if (addr.nTime > nSince && !pfrom->fGetAddr && vAddr.size() <= 10 && addr.IsRoutable()) {
15091549
// Relay to a limited number of other nodes
@@ -1513,6 +1553,11 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR
15131553
if (fReachable)
15141554
vAddrOk.push_back(addr);
15151555
}
1556+
CNodeState* state = State(pfrom->GetId());
1557+
state->amt_addr_processed += num_proc;
1558+
state->amt_addr_rate_limited += num_rate_limit;
1559+
LogPrint(BCLog::NET, "Received addr: %u addresses (%u processed, %u rate-limited) from peer=%d\n",
1560+
vAddr.size(), num_proc, num_rate_limit, pfrom->GetId());
15161561
connman->AddNewAddresses(vAddrOk, pfrom->addr, 2 * 60 * 60);
15171562
if (vAddr.size() < 1000)
15181563
pfrom->fGetAddr = false;
@@ -1962,8 +2007,11 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR
19622007
pfrom->vAddrToSend.clear();
19632008
std::vector<CAddress> vAddr = connman->GetAddresses(MAX_ADDR_TO_SEND, MAX_PCT_ADDR_TO_SEND, /* network */ nullopt);
19642009
FastRandomContext insecure_rand;
1965-
for (const CAddress& addr : vAddr)
1966-
pfrom->PushAddress(addr, insecure_rand);
2010+
for (const CAddress& addr : vAddr) {
2011+
if (!connman->IsBanned(addr)) {
2012+
pfrom->PushAddress(addr, insecure_rand);
2013+
}
2014+
}
19672015
}
19682016

19692017

src/net_processing.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ struct CNodeStateStats {
6464
int nSyncHeight;
6565
int nCommonHeight;
6666
std::vector<int> vHeightInFlight;
67+
uint64_t m_addr_processed = 0;
68+
uint64_t m_addr_rate_limited = 0;
6769
};
6870

6971
/** Get statistics from node state */
@@ -72,4 +74,11 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats);
7274
void Misbehaving(NodeId nodeid, int howmuch, const std::string& message="") EXCLUSIVE_LOCKS_REQUIRED(cs_main);
7375
bool IsBanned(NodeId nodeid);
7476

77+
78+
using SecondsDouble = std::chrono::duration<double, std::chrono::seconds::period>;
79+
/**
80+
* Helper to count the seconds in any std::chrono::duration type
81+
*/
82+
inline double CountSecondsDouble(SecondsDouble t) { return t.count(); }
83+
7584
#endif // BITCOIN_NET_PROCESSING_H

src/rpc/net.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ UniValue getpeerinfo(const JSONRPCRequest& request)
101101
" n, (numeric) The heights of blocks we're currently asking from this peer\n"
102102
" ...\n"
103103
" ]\n"
104+
" \"addr_processed\": n, (numeric) The total number of addresses processed, excluding those dropped due to rate limiting\n"
105+
" \"addr_rate_limited\": n, (numeric) The total number of addresses dropped due to rate limiting\n"
104106
" \"bytessent_per_msg\": {\n"
105107
" \"addr\": n, (numeric) The total bytes sent aggregated by message type\n"
106108
" ...\n"
@@ -166,6 +168,8 @@ UniValue getpeerinfo(const JSONRPCRequest& request)
166168
heights.push_back(height);
167169
}
168170
obj.pushKV("inflight", heights);
171+
obj.pushKV("addr_processed", statestats.m_addr_processed);
172+
obj.pushKV("addr_rate_limited", statestats.m_addr_rate_limited);
169173
}
170174
obj.pushKV("whitelisted", stats.fWhitelisted);
171175

test/functional/p2p_addr_relay.py

Lines changed: 88 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,20 @@
66
Test addr relay
77
"""
88

9-
import time
10-
119
from test_framework.messages import (
1210
CAddress,
1311
NODE_NETWORK,
1412
msg_addr,
1513
)
1614
from test_framework.mininode import (
1715
P2PInterface,
16+
mininode_lock
1817
)
1918
from test_framework.test_framework import PivxTestFramework
20-
from test_framework.util import (
21-
assert_equal,
22-
)
19+
from test_framework.util import assert_equal
20+
21+
import random
22+
import time
2323

2424
ADDRS = []
2525
for i in range(10):
@@ -32,17 +32,33 @@
3232

3333

3434
class AddrReceiver(P2PInterface):
35+
_tokens = 10
3536
def on_addr(self, message):
3637
for addr in message.addrs:
3738
assert_equal(addr.nServices, 1)
3839
assert addr.ip.startswith('123.123.123.')
3940
assert (8333 <= addr.port < 8343)
4041

42+
def on_getaddr(self, message):
43+
# When the node sends us a getaddr, it increments the addr relay tokens for the connection by 1000
44+
self._tokens += 1000
45+
46+
@property
47+
def tokens(self):
48+
with mininode_lock:
49+
return self._tokens
50+
51+
def increment_tokens(self, n):
52+
# When we move mocktime forward, the node increments the addr relay tokens for its peers
53+
with mininode_lock:
54+
self._tokens += n
55+
4156

4257
class AddrTest(PivxTestFramework):
4358
def set_test_params(self):
4459
self.setup_clean_chain = False
4560
self.num_nodes = 1
61+
self.extra_args = [["-whitelist=127.0.0.1"]]
4662

4763
def run_test(self):
4864
self.log.info('Create connection that sends addr messages')
@@ -57,6 +73,8 @@ def run_test(self):
5773
self.log.info('Check that addr message content is relayed and added to addrman')
5874
addr_receiver = self.nodes[0].add_p2p_connection(AddrReceiver())
5975
msg.addrs = ADDRS
76+
self.mocktime += 10
77+
self.nodes[0].setmocktime(self.mocktime)
6078
with self.nodes[0].assert_debug_log([
6179
'Added 10 addresses from 127.0.0.1: 0 tried',
6280
'received: addr (301 bytes) peer=0',
@@ -65,6 +83,71 @@ def run_test(self):
6583
addr_source.send_and_ping(msg)
6684
self.nodes[0].setmocktime(int(time.time()) + 30 * 60)
6785
addr_receiver.sync_with_ping()
86+
self.rate_limit_tests()
87+
88+
def setup_rand_addr_msg(self, num):
89+
addrs = []
90+
for i in range(num):
91+
addr = CAddress()
92+
addr.time = self.mocktime + i
93+
addr.nServices = NODE_NETWORK
94+
addr.ip = f"{random.randrange(128,169)}.{random.randrange(1,255)}.{random.randrange(1,255)}.{random.randrange(1,255)}"
95+
addr.port = 8333
96+
addrs.append(addr)
97+
msg = msg_addr()
98+
msg.addrs = addrs
99+
return msg
100+
101+
def send_addrs_and_test_rate_limiting(self, peer, new_addrs, total_addrs):
102+
"""Send an addr message and check that the number of addresses processed and rate-limited is as expected"""
103+
104+
peer.send_and_ping(self.setup_rand_addr_msg(new_addrs))
105+
106+
peerinfo = self.nodes[0].getpeerinfo()[2]
107+
addrs_processed = peerinfo['addr_processed']
108+
addrs_rate_limited = peerinfo['addr_rate_limited']
109+
self.log.info(f'addrs_processed = {addrs_processed}, addrs_rate_limited = {addrs_rate_limited}')
110+
self.log.info(f'peer_tokens = {peer.tokens}')
111+
112+
assert_equal(addrs_processed, min(total_addrs, peer.tokens))
113+
assert_equal(addrs_rate_limited, max(0, total_addrs - peer.tokens))
114+
115+
def rate_limit_tests(self):
116+
117+
self.mocktime = int(time.time())
118+
self.nodes[0].setmocktime(self.mocktime)
119+
120+
peer = self.nodes[0].add_p2p_connection(AddrReceiver())
121+
122+
self.log.info(f'Test rate limiting of addr processing for inbound peers')
123+
124+
# Send 600 addresses.
125+
self.send_addrs_and_test_rate_limiting(peer, 600, 600)
126+
127+
# Send 400 more addresses.
128+
self.send_addrs_and_test_rate_limiting(peer, 400, 1000)
129+
130+
# Send 10 more. As we reached the processing limit for nodes, no more addresses should be procesesd.
131+
self.send_addrs_and_test_rate_limiting(peer, 10, 1010)
132+
133+
# Advance the time by 100 seconds, permitting the processing of 10 more addresses.
134+
# Send 200 and verify that 10 are processed.
135+
self.mocktime += 100
136+
self.nodes[0].setmocktime(self.mocktime)
137+
peer.increment_tokens(10)
138+
139+
self.send_addrs_and_test_rate_limiting(peer, 200, 1210)
140+
141+
# Advance the time by 1000 seconds, permitting the processing of 100 more addresses.
142+
# Send 200 and verify that 100 are processed.
143+
self.mocktime += 1000
144+
self.nodes[0].setmocktime(self.mocktime)
145+
peer.increment_tokens(100)
146+
147+
self.send_addrs_and_test_rate_limiting(peer, 200, 1410)
148+
149+
self.nodes[0].disconnect_p2ps()
150+
68151

69152

70153
if __name__ == '__main__':

test/functional/p2p_addrv2_relay.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def on_addrv2(self, message):
3838
assert_equal(addr.nServices, 1) # NODE_NETWORK
3939
assert addr.ip.startswith('123.123.123.')
4040
assert (8333 <= addr.port < 8343)
41-
self.addrv2_received_and_checked = True
41+
self.addrv2_received_and_checked = True
4242

4343
def wait_for_addrv2(self):
4444
self.wait_until(lambda: "addrv2" in self.last_message)
@@ -62,6 +62,8 @@ def run_test(self):
6262
self.log.info('Check that addrv2 message content is relayed and added to addrman')
6363
addr_receiver = self.nodes[0].add_p2p_connection(AddrReceiver())
6464
msg.addrs = ADDRS
65+
self.mocktime += 10
66+
self.nodes[0].setmocktime(self.mocktime)
6567
with self.nodes[0].assert_debug_log([
6668
'Added 10 addresses from 127.0.0.1: 0 tried',
6769
'received: addrv2 (131 bytes) peer=0',
@@ -70,7 +72,6 @@ def run_test(self):
7072
addr_source.send_and_ping(msg)
7173
self.nodes[0].setmocktime(int(time.time()) + 30 * 60)
7274
addr_receiver.wait_for_addrv2()
73-
7475
assert addr_receiver.addrv2_received_and_checked
7576

7677

test/functional/p2p_invalid_messages.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,8 @@ def test_addrv2_unrecognized_network(self):
174174
self.test_addrv2('unrecognized network',
175175
[
176176
'received: addrv2 (25 bytes)',
177-
'IP 9.9.9.9 mapped',
178-
'Added 1 addresses',
177+
'Received addr: 2 addresses (2 processed, 0 rate-limited)',
178+
'received: ping (8 bytes)',
179179
],
180180
hex_str_to_bytes(
181181
'02' + # number of entries

test/functional/test_runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
'mempool_persist.py', # ~ 417 sec
6363
'p2p_quorum_connect.py', # ~ 400 sec
6464
'wallet_reorgsrestore.py', # ~ 391 sec
65+
'p2p_addr_relay.py', # ~ 380 sec
6566

6667
# vv Tests less than 5m vv
6768
'wallet_hd.py', # ~ 300 sec
@@ -135,7 +136,6 @@
135136
'rpc_decodescript.py', # ~ 50 sec
136137
'rpc_blockchain.py', # ~ 50 sec
137138
'wallet_disable.py', # ~ 50 sec
138-
'p2p_addr_relay.py', # ~ 49 sec
139139
'p2p_addrv2_relay.py', # ~ 49 sec
140140
'wallet_autocombine.py', # ~ 49 sec
141141
'mining_v5_upgrade.py', # ~ 48 sec

0 commit comments

Comments
 (0)