Skip to content

Commit 88777c4

Browse files
committed
net: add Relay() method to struct TxRelay
1 parent 7a44768 commit 88777c4

File tree

1 file changed

+179
-148
lines changed

1 file changed

+179
-148
lines changed

src/net_processing.cpp

Lines changed: 179 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,28 @@ static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND};
179179
/** The compactblocks version we support. See BIP 152. */
180180
static constexpr uint64_t CMPCTBLOCKS_VERSION{2};
181181

182+
namespace {
183+
class CompareInvMempoolOrder
184+
{
185+
CTxMemPool* mp;
186+
bool m_wtxid_relay;
187+
188+
public:
189+
explicit CompareInvMempoolOrder(CTxMemPool* _mempool, bool use_wtxid)
190+
{
191+
mp = _mempool;
192+
m_wtxid_relay = use_wtxid;
193+
}
194+
195+
bool operator()(std::set<uint256>::iterator a, std::set<uint256>::iterator b)
196+
{
197+
/* As std::make_heap produces a max-heap, we want the entries with the
198+
* fewest ancestors/highest fee to sort later. */
199+
return mp->CompareDepthAndScore(*b, *a, m_wtxid_relay);
200+
}
201+
};
202+
} // namespace
203+
182204
// Internal stuff
183205
namespace {
184206
/** Blocks that are in flight, and that are in the queue to be downloaded. */
@@ -311,6 +333,150 @@ struct Peer {
311333
LOCK(m_tx_inventory_mutex);
312334
m_send_mempool = value;
313335
}
336+
337+
void Relay(
338+
CNode* pto,
339+
CRollingBloomFilter& recently_announced_invs,
340+
const bool wtxid_relay,
341+
const std::chrono::microseconds current_time,
342+
const std::chrono::microseconds next_inv_send_time,
343+
const std::chrono::microseconds next_inv_rand_time,
344+
CConnman& connman,
345+
CTxMemPool& mempool,
346+
std::vector<CInv>& vInv,
347+
const CNetMsgMaker& msgMaker,
348+
std::map<uint256, CTransactionRef>& mapRelay,
349+
std::deque<std::pair<std::chrono::microseconds, std::map<uint256, CTransactionRef>::iterator>>& g_relay_expiration)
350+
EXCLUSIVE_LOCKS_REQUIRED(!m_tx_inventory_mutex)
351+
{
352+
AssertLockNotHeld(m_tx_inventory_mutex);
353+
LOCK(m_tx_inventory_mutex);
354+
// Check whether periodic sends should happen
355+
bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan);
356+
if (m_next_inv_send_time < current_time) {
357+
fSendTrickle = true;
358+
if (pto->IsInboundConn()) {
359+
m_next_inv_send_time = next_inv_send_time;
360+
} else {
361+
m_next_inv_send_time = next_inv_rand_time;
362+
}
363+
}
364+
365+
// Time to send but the peer has requested we not relay transactions.
366+
if (fSendTrickle) {
367+
LOCK(m_bloom_filter_mutex);
368+
if (!m_relay_txs) m_tx_inventory_to_send.clear();
369+
}
370+
371+
// Respond to BIP35 mempool requests
372+
if (fSendTrickle && m_send_mempool) {
373+
auto vtxinfo = mempool.infoAll();
374+
m_send_mempool = false;
375+
const CFeeRate filterrate{m_fee_filter_received.load()};
376+
377+
LOCK(m_bloom_filter_mutex);
378+
379+
for (const auto& txinfo : vtxinfo) {
380+
const uint256& hash = wtxid_relay ? txinfo.tx->GetWitnessHash() : txinfo.tx->GetHash();
381+
CInv inv(wtxid_relay ? MSG_WTX : MSG_TX, hash);
382+
m_tx_inventory_to_send.erase(hash);
383+
// Don't send transactions that peers will not put into their mempool
384+
if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) {
385+
continue;
386+
}
387+
if (m_bloom_filter) {
388+
if (!m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
389+
}
390+
m_tx_inventory_known_filter.insert(hash);
391+
// Responses to MEMPOOL requests bypass the m_recently_announced_invs filter.
392+
vInv.push_back(inv);
393+
if (vInv.size() == MAX_INV_SZ) {
394+
connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
395+
vInv.clear();
396+
}
397+
}
398+
m_last_mempool_req = std::chrono::duration_cast<std::chrono::seconds>(current_time);
399+
}
400+
401+
// Determine transactions to relay
402+
if (fSendTrickle) {
403+
// Produce a vector with all candidates for sending
404+
std::vector<std::set<uint256>::iterator> vInvTx;
405+
vInvTx.reserve(m_tx_inventory_to_send.size());
406+
for (std::set<uint256>::iterator it = m_tx_inventory_to_send.begin(); it != m_tx_inventory_to_send.end(); it++) {
407+
vInvTx.push_back(it);
408+
}
409+
const CFeeRate filterrate{m_fee_filter_received.load()};
410+
// Topologically and fee-rate sort the inventory we send for privacy and priority reasons.
411+
// A heap is used so that not all items need sorting if only a few are being sent.
412+
CompareInvMempoolOrder compareInvMempoolOrder(&mempool, wtxid_relay);
413+
std::make_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
414+
// No reason to drain out at many times the network's capacity,
415+
// especially since we have many peers and some will draw much shorter delays.
416+
unsigned int nRelayedTransactions = 0;
417+
LOCK(m_bloom_filter_mutex);
418+
while (!vInvTx.empty() && nRelayedTransactions < INVENTORY_BROADCAST_MAX) {
419+
// Fetch the top element from the heap
420+
std::pop_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
421+
std::set<uint256>::iterator it = vInvTx.back();
422+
vInvTx.pop_back();
423+
uint256 hash = *it;
424+
CInv inv(wtxid_relay ? MSG_WTX : MSG_TX, hash);
425+
// Remove it from the to-be-sent set
426+
m_tx_inventory_to_send.erase(it);
427+
// Check if not in the filter already
428+
if (m_tx_inventory_known_filter.contains(hash)) {
429+
continue;
430+
}
431+
// Not in the mempool anymore? don't bother sending it.
432+
auto txinfo = mempool.info(ToGenTxid(inv));
433+
if (!txinfo.tx) {
434+
continue;
435+
}
436+
auto txid = txinfo.tx->GetHash();
437+
auto wtxid = txinfo.tx->GetWitnessHash();
438+
// Peer told you to not send transactions at that feerate? Don't bother sending it.
439+
if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) {
440+
continue;
441+
}
442+
if (m_bloom_filter && !m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
443+
// Send
444+
recently_announced_invs.insert(hash);
445+
vInv.push_back(inv);
446+
nRelayedTransactions++;
447+
{
448+
// Expire old relay messages
449+
while (!g_relay_expiration.empty() && g_relay_expiration.front().first < current_time) {
450+
mapRelay.erase(g_relay_expiration.front().second);
451+
g_relay_expiration.pop_front();
452+
}
453+
454+
auto ret = mapRelay.emplace(txid, std::move(txinfo.tx));
455+
if (ret.second) {
456+
g_relay_expiration.emplace_back(current_time + RELAY_TX_CACHE_TIME, ret.first);
457+
}
458+
// Add wtxid-based lookup into mapRelay as well, so that peers can request by wtxid
459+
auto ret2 = mapRelay.emplace(wtxid, ret.first->second);
460+
if (ret2.second) {
461+
g_relay_expiration.emplace_back(current_time + RELAY_TX_CACHE_TIME, ret2.first);
462+
}
463+
}
464+
if (vInv.size() == MAX_INV_SZ) {
465+
connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
466+
vInv.clear();
467+
}
468+
m_tx_inventory_known_filter.insert(hash);
469+
if (hash != txid) {
470+
// Insert txid into m_tx_inventory_known_filter, even for
471+
// wtxidrelay peers. This prevents re-adding of
472+
// unconfirmed parents to the recently_announced
473+
// filter, when a child tx is requested. See
474+
// ProcessGetData().
475+
m_tx_inventory_known_filter.insert(txid);
476+
}
477+
}
478+
}
479+
}
314480
};
315481

316482
/* Initializes a TxRelay struct for this peer. Can be called at most once for a peer. */
@@ -4658,27 +4824,6 @@ void PeerManagerImpl::MaybeSendFeefilter(CNode& pto, Peer& peer, std::chrono::mi
46584824
}
46594825
}
46604826

4661-
namespace {
4662-
class CompareInvMempoolOrder
4663-
{
4664-
CTxMemPool* mp;
4665-
bool m_wtxid_relay;
4666-
public:
4667-
explicit CompareInvMempoolOrder(CTxMemPool *_mempool, bool use_wtxid)
4668-
{
4669-
mp = _mempool;
4670-
m_wtxid_relay = use_wtxid;
4671-
}
4672-
4673-
bool operator()(std::set<uint256>::iterator a, std::set<uint256>::iterator b)
4674-
{
4675-
/* As std::make_heap produces a max-heap, we want the entries with the
4676-
* fewest ancestors/highest fee to sort later. */
4677-
return mp->CompareDepthAndScore(*b, *a, m_wtxid_relay);
4678-
}
4679-
};
4680-
} // namespace
4681-
46824827
bool PeerManagerImpl::RejectIncomingTxs(const CNode& peer) const
46834828
{
46844829
// block-relay-only peers may never send txs to us
@@ -4947,133 +5092,19 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
49475092
}
49485093

49495094
if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
4950-
LOCK(tx_relay->m_tx_inventory_mutex);
4951-
// Check whether periodic sends should happen
4952-
bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan);
4953-
if (tx_relay->m_next_inv_send_time < current_time) {
4954-
fSendTrickle = true;
4955-
if (pto->IsInboundConn()) {
4956-
tx_relay->m_next_inv_send_time = NextInvToInbounds(current_time, INBOUND_INVENTORY_BROADCAST_INTERVAL);
4957-
} else {
4958-
tx_relay->m_next_inv_send_time = GetExponentialRand(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL);
4959-
}
4960-
}
4961-
4962-
// Time to send but the peer has requested we not relay transactions.
4963-
if (fSendTrickle) {
4964-
LOCK(tx_relay->m_bloom_filter_mutex);
4965-
if (!tx_relay->m_relay_txs) tx_relay->m_tx_inventory_to_send.clear();
4966-
}
4967-
4968-
// Respond to BIP35 mempool requests
4969-
if (fSendTrickle && tx_relay->m_send_mempool) {
4970-
auto vtxinfo = m_mempool.infoAll();
4971-
tx_relay->m_send_mempool = false;
4972-
const CFeeRate filterrate{tx_relay->m_fee_filter_received.load()};
4973-
4974-
LOCK(tx_relay->m_bloom_filter_mutex);
4975-
4976-
for (const auto& txinfo : vtxinfo) {
4977-
const uint256& hash = peer->m_wtxid_relay ? txinfo.tx->GetWitnessHash() : txinfo.tx->GetHash();
4978-
CInv inv(peer->m_wtxid_relay ? MSG_WTX : MSG_TX, hash);
4979-
tx_relay->m_tx_inventory_to_send.erase(hash);
4980-
// Don't send transactions that peers will not put into their mempool
4981-
if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) {
4982-
continue;
4983-
}
4984-
if (tx_relay->m_bloom_filter) {
4985-
if (!tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
4986-
}
4987-
tx_relay->m_tx_inventory_known_filter.insert(hash);
4988-
// Responses to MEMPOOL requests bypass the m_recently_announced_invs filter.
4989-
vInv.push_back(inv);
4990-
if (vInv.size() == MAX_INV_SZ) {
4991-
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
4992-
vInv.clear();
4993-
}
4994-
}
4995-
tx_relay->m_last_mempool_req = std::chrono::duration_cast<std::chrono::seconds>(current_time);
4996-
}
4997-
4998-
// Determine transactions to relay
4999-
if (fSendTrickle) {
5000-
// Produce a vector with all candidates for sending
5001-
std::vector<std::set<uint256>::iterator> vInvTx;
5002-
vInvTx.reserve(tx_relay->m_tx_inventory_to_send.size());
5003-
for (std::set<uint256>::iterator it = tx_relay->m_tx_inventory_to_send.begin(); it != tx_relay->m_tx_inventory_to_send.end(); it++) {
5004-
vInvTx.push_back(it);
5005-
}
5006-
const CFeeRate filterrate{tx_relay->m_fee_filter_received.load()};
5007-
// Topologically and fee-rate sort the inventory we send for privacy and priority reasons.
5008-
// A heap is used so that not all items need sorting if only a few are being sent.
5009-
CompareInvMempoolOrder compareInvMempoolOrder(&m_mempool, peer->m_wtxid_relay);
5010-
std::make_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
5011-
// No reason to drain out at many times the network's capacity,
5012-
// especially since we have many peers and some will draw much shorter delays.
5013-
unsigned int nRelayedTransactions = 0;
5014-
LOCK(tx_relay->m_bloom_filter_mutex);
5015-
while (!vInvTx.empty() && nRelayedTransactions < INVENTORY_BROADCAST_MAX) {
5016-
// Fetch the top element from the heap
5017-
std::pop_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
5018-
std::set<uint256>::iterator it = vInvTx.back();
5019-
vInvTx.pop_back();
5020-
uint256 hash = *it;
5021-
CInv inv(peer->m_wtxid_relay ? MSG_WTX : MSG_TX, hash);
5022-
// Remove it from the to-be-sent set
5023-
tx_relay->m_tx_inventory_to_send.erase(it);
5024-
// Check if not in the filter already
5025-
if (tx_relay->m_tx_inventory_known_filter.contains(hash)) {
5026-
continue;
5027-
}
5028-
// Not in the mempool anymore? don't bother sending it.
5029-
auto txinfo = m_mempool.info(ToGenTxid(inv));
5030-
if (!txinfo.tx) {
5031-
continue;
5032-
}
5033-
auto txid = txinfo.tx->GetHash();
5034-
auto wtxid = txinfo.tx->GetWitnessHash();
5035-
// Peer told you to not send transactions at that feerate? Don't bother sending it.
5036-
if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) {
5037-
continue;
5038-
}
5039-
if (tx_relay->m_bloom_filter && !tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
5040-
// Send
5041-
State(pto->GetId())->m_recently_announced_invs.insert(hash);
5042-
vInv.push_back(inv);
5043-
nRelayedTransactions++;
5044-
{
5045-
// Expire old relay messages
5046-
while (!g_relay_expiration.empty() && g_relay_expiration.front().first < current_time)
5047-
{
5048-
mapRelay.erase(g_relay_expiration.front().second);
5049-
g_relay_expiration.pop_front();
5050-
}
5051-
5052-
auto ret = mapRelay.emplace(txid, std::move(txinfo.tx));
5053-
if (ret.second) {
5054-
g_relay_expiration.emplace_back(current_time + RELAY_TX_CACHE_TIME, ret.first);
5055-
}
5056-
// Add wtxid-based lookup into mapRelay as well, so that peers can request by wtxid
5057-
auto ret2 = mapRelay.emplace(wtxid, ret.first->second);
5058-
if (ret2.second) {
5059-
g_relay_expiration.emplace_back(current_time + RELAY_TX_CACHE_TIME, ret2.first);
5060-
}
5061-
}
5062-
if (vInv.size() == MAX_INV_SZ) {
5063-
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
5064-
vInv.clear();
5065-
}
5066-
tx_relay->m_tx_inventory_known_filter.insert(hash);
5067-
if (hash != txid) {
5068-
// Insert txid into m_tx_inventory_known_filter, even for
5069-
// wtxidrelay peers. This prevents re-adding of
5070-
// unconfirmed parents to the recently_announced
5071-
// filter, when a child tx is requested. See
5072-
// ProcessGetData().
5073-
tx_relay->m_tx_inventory_known_filter.insert(txid);
5074-
}
5075-
}
5076-
}
5095+
tx_relay->Relay(
5096+
pto,
5097+
State(pto->GetId())->m_recently_announced_invs,
5098+
peer->m_wtxid_relay,
5099+
current_time,
5100+
NextInvToInbounds(current_time, INBOUND_INVENTORY_BROADCAST_INTERVAL),
5101+
GetExponentialRand(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL),
5102+
m_connman,
5103+
m_mempool,
5104+
vInv,
5105+
msgMaker,
5106+
mapRelay,
5107+
g_relay_expiration);
50775108
}
50785109
if (!vInv.empty())
50795110
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));

0 commit comments

Comments
 (0)