|
| 1 | +# nimbus-execution-client |
| 2 | +# Copyright (c) 2025 Status Research & Development GmbH |
| 3 | +# Licensed under either of |
| 4 | +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) |
| 5 | +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) |
| 6 | +# at your option. |
| 7 | +# This file may not be copied, modified, or distributed except according to |
| 8 | +# those terms. |
| 9 | + |
| 10 | +import |
| 11 | + std/[tables, sets, times, sequtils, random], |
| 12 | + chronos, |
| 13 | + chronos/ratelimit, |
| 14 | + chronicles, |
| 15 | + eth/common/hashes, |
| 16 | + eth/common/times, |
| 17 | + results, |
| 18 | + ./types, |
| 19 | + ./requester, |
| 20 | + ../../networking/p2p, |
| 21 | + ../../core/tx_pool, |
| 22 | + ../../core/pooled_txs_rlp, |
| 23 | + ../../core/eip4844, |
| 24 | + ../../core/eip7594, |
| 25 | + ../../core/chain/forked_chain |
| 26 | + |
| 27 | +logScope: |
| 28 | + topics = "tx-broadcast" |
| 29 | + |
| 30 | +const |
| 31 | + maxOperationQuota = 1000000 |
| 32 | + fullReplenishTime = chronos.seconds(5) |
| 33 | + POOLED_STORAGE_TIME_LIMIT = initDuration(minutes = 20) |
| 34 | + cleanupTicker = chronos.minutes(5) |
| 35 | + # https://github.com/ethereum/devp2p/blob/b0c213de97978053a0f62c3ea4d23c0a3d8784bc/caps/eth.md#blockrangeupdate-0x11 |
| 36 | + blockRangeUpdateTicker = chronos.minutes(2) |
| 37 | + SOFT_RESPONSE_LIMIT* = 2 * 1024 * 1024 |
| 38 | + |
| 39 | +template awaitQuota(bcParam: EthWireRef, costParam: float, protocolIdParam: string) = |
| 40 | + let |
| 41 | + wire = bcParam |
| 42 | + cost = int(costParam) |
| 43 | + protocolId = protocolIdParam |
| 44 | + |
| 45 | + try: |
| 46 | + if not wire.quota.tryConsume(cost): |
| 47 | + debug "Awaiting broadcast quota", cost = cost, protocolId = protocolId |
| 48 | + await wire.quota.consume(cost) |
| 49 | + except CancelledError as exc: |
| 50 | + raise exc |
| 51 | + except CatchableError as exc: |
| 52 | + debug "Error while waiting broadcast quota", |
| 53 | + cost = cost, protocolId = protocolId, msg = exc.msg |
| 54 | + |
| 55 | +template reqisterAction(wire: EthWireRef, actionDesc: string, body) = |
| 56 | + block: |
| 57 | + proc actionHandler(): Future[void] {.async: (raises: [CancelledError]).} = |
| 58 | + debug "Invoking broadcast action", desc=actionDesc |
| 59 | + body |
| 60 | + |
| 61 | + await wire.actionQueue.addLast(actionHandler) |
| 62 | + |
| 63 | +func allowedOpsPerSecondCost(n: int): float = |
| 64 | + const replenishRate = (maxOperationQuota / fullReplenishTime.nanoseconds.float) |
| 65 | + (replenishRate * 1000000000'f / n.float) |
| 66 | + |
| 67 | +const |
| 68 | + txPoolProcessCost = allowedOpsPerSecondCost(1000) |
| 69 | + hashLookupCost = allowedOpsPerSecondCost(2000) |
| 70 | + blockRangeUpdateCost = allowedOpsPerSecondCost(20) |
| 71 | + |
| 72 | +iterator peers69OrLater(wire: EthWireRef, random: bool = false): Peer = |
| 73 | + var peers = newSeqOfCap[Peer](wire.node.numPeers) |
| 74 | + for peer in wire.node.peers: |
| 75 | + if peer.isNil: |
| 76 | + continue |
| 77 | + if peer.supports(eth69): |
| 78 | + peers.add peer |
| 79 | + if random: |
| 80 | + shuffle(peers) |
| 81 | + for peer in peers: |
| 82 | + if peer.connectionState != ConnectionState.Connected: |
| 83 | + continue |
| 84 | + yield peer |
| 85 | + |
| 86 | +proc syncerRunning*(wire: EthWireRef): bool = |
| 87 | + # Disable transactions gossip and processing when |
| 88 | + # the syncer is still busy |
| 89 | + const |
| 90 | + thresholdTime = 3 * 15 |
| 91 | + |
| 92 | + let |
| 93 | + nowTime = EthTime.now() |
| 94 | + headerTime = wire.chain.latestHeader.timestamp |
| 95 | + |
| 96 | + let running = (nowTime - headerTime) > thresholdTime |
| 97 | + if running != not wire.gossipEnabled: |
| 98 | + wire.gossipEnabled = not running |
| 99 | + notice "Transaction broadcast state changed", enabled = wire.gossipEnabled |
| 100 | + |
| 101 | + running |
| 102 | + |
| 103 | +proc handleTransactionsBroadcast*(wire: EthWireRef, |
| 104 | + packet: TransactionsPacket, |
| 105 | + peer: Peer) {.async: (raises: [CancelledError]).} = |
| 106 | + if wire.syncerRunning: |
| 107 | + return |
| 108 | + |
| 109 | + if packet.transactions.len == 0: |
| 110 | + return |
| 111 | + |
| 112 | + debug "received new transactions", |
| 113 | + number = packet.transactions.len |
| 114 | + |
| 115 | + wire.reqisterAction("TxPool consume incoming transactions"): |
| 116 | + for tx in packet.transactions: |
| 117 | + if tx.txType == TxEip4844: |
| 118 | + # Disallow blob transaction broadcast |
| 119 | + debug "Protocol Breach: Peer broadcast blob transaction", |
| 120 | + remote=peer.remote, clientId=peer.clientId |
| 121 | + await peer.disconnect(BreachOfProtocol) |
| 122 | + return |
| 123 | + |
| 124 | + wire.txPool.addTx(tx).isOkOr: |
| 125 | + continue |
| 126 | + |
| 127 | + awaitQuota(wire, txPoolProcessCost, "adding into txpool") |
| 128 | + |
| 129 | +proc handleTxHashesBroadcast*(wire: EthWireRef, |
| 130 | + packet: NewPooledTransactionHashesPacket, |
| 131 | + peer: Peer) {.async: (raises: [CancelledError]).} = |
| 132 | + if wire.syncerRunning: |
| 133 | + return |
| 134 | + |
| 135 | + if packet.txHashes.len == 0: |
| 136 | + return |
| 137 | + |
| 138 | + debug "received new pooled tx hashes", |
| 139 | + hashes = packet.txHashes.len |
| 140 | + |
| 141 | + if packet.txHashes.len != packet.txSizes.len or |
| 142 | + packet.txHashes.len != packet.txTypes.len: |
| 143 | + debug "Protocol Breach: new pooled tx hashes invalid params", |
| 144 | + hashes = packet.txHashes.len, |
| 145 | + sizes = packet.txSizes.len, |
| 146 | + types = packet.txTypes.len |
| 147 | + await peer.disconnect(BreachOfProtocol) |
| 148 | + return |
| 149 | + |
| 150 | + wire.reqisterAction("Handle broadcast transactions hashes"): |
| 151 | + var |
| 152 | + i = 0 |
| 153 | + |
| 154 | + while i < packet.txHashes.len: |
| 155 | + var |
| 156 | + msg: PooledTransactionsRequest |
| 157 | + res: Opt[PooledTransactionsPacket] |
| 158 | + sizes: seq[uint64] |
| 159 | + types: seq[byte] |
| 160 | + sumSize = 0'u64 |
| 161 | + |
| 162 | + while i < packet.txHashes.len: |
| 163 | + let size = packet.txSizes[i] |
| 164 | + if sumSize + size > SOFT_RESPONSE_LIMIT.uint64: |
| 165 | + break |
| 166 | + |
| 167 | + let txHash = packet.txHashes[i] |
| 168 | + if txHash notin wire.txPool: |
| 169 | + msg.txHashes.add txHash |
| 170 | + sumSize += size |
| 171 | + sizes.add size |
| 172 | + types.add packet.txTypes[i] |
| 173 | + |
| 174 | + awaitQuota(wire, hashLookupCost, "check transaction exists in pool") |
| 175 | + inc i |
| 176 | + |
| 177 | + try: |
| 178 | + res = await peer.getPooledTransactions(msg) |
| 179 | + except EthP2PError as exc: |
| 180 | + debug "request pooled transactions failed", |
| 181 | + msg=exc.msg |
| 182 | + |
| 183 | + if res.isNone: |
| 184 | + debug "request pooled transactions get nothing" |
| 185 | + return |
| 186 | + |
| 187 | + let |
| 188 | + ptx = res.get() |
| 189 | + |
| 190 | + for i, tx in ptx.transactions: |
| 191 | + # If we receive any blob transactions missing sidecars, or with |
| 192 | + # sidecars that don't correspond to the versioned hashes reported |
| 193 | + # in the header, disconnect from the sending peer. |
| 194 | + if tx.tx.txType.byte != types[i]: |
| 195 | + debug "Protocol Breach: Received transaction with type differ from announced", |
| 196 | + remote=peer.remote, clientId=peer.clientId |
| 197 | + await peer.disconnect(BreachOfProtocol) |
| 198 | + return |
| 199 | + |
| 200 | + let (size, hash) = getEncodedLengthAndHash(tx) |
| 201 | + if size.uint64 != sizes[i]: |
| 202 | + debug "Protocol Breach: Received transaction with size differ from announced", |
| 203 | + remote=peer.remote, clientId=peer.clientId |
| 204 | + await peer.disconnect(BreachOfProtocol) |
| 205 | + return |
| 206 | + |
| 207 | + if hash != msg.txHashes[i]: |
| 208 | + debug "Protocol Breach: Received transaction with hash differ from announced", |
| 209 | + remote=peer.remote, clientId=peer.clientId |
| 210 | + await peer.disconnect(BreachOfProtocol) |
| 211 | + return |
| 212 | + |
| 213 | + if tx.tx.txType == TxEip4844: |
| 214 | + if tx.blobsBundle.isNil: |
| 215 | + debug "Protocol Breach: Received sidecar-less blob transaction", |
| 216 | + remote=peer.remote, clientId=peer.clientId |
| 217 | + await peer.disconnect(BreachOfProtocol) |
| 218 | + return |
| 219 | + |
| 220 | + if tx.blobsBundle.wrapperVersion == WrapperVersionEIP4844: |
| 221 | + validateBlobTransactionWrapper4844(tx).isOkOr: |
| 222 | + debug "Protocol Breach: Sidecar validation error", msg=error, |
| 223 | + remote=peer.remote, clientId=peer.clientId |
| 224 | + await peer.disconnect(BreachOfProtocol) |
| 225 | + return |
| 226 | + |
| 227 | + if tx.blobsBundle.wrapperVersion == WrapperVersionEIP7594: |
| 228 | + validateBlobTransactionWrapper7594(tx).isOkOr: |
| 229 | + debug "Protocol Breach: Sidecar validation error", msg=error, |
| 230 | + remote=peer.remote, clientId=peer.clientId |
| 231 | + await peer.disconnect(BreachOfProtocol) |
| 232 | + return |
| 233 | + |
| 234 | + wire.txPool.addTx(tx).isOkOr: |
| 235 | + continue |
| 236 | + |
| 237 | + awaitQuota(wire, txPoolProcessCost, "broadcast transactions hashes") |
| 238 | + |
| 239 | +proc tickerLoop*(wire: EthWireRef) {.async: (raises: [CancelledError]).} = |
| 240 | + while true: |
| 241 | + let |
| 242 | + cleanup = sleepAsync(cleanupTicker) |
| 243 | + update = sleepAsync(blockRangeUpdateTicker) |
| 244 | + res = await one(cleanup, update) |
| 245 | + |
| 246 | + if res == cleanup: |
| 247 | + wire.reqisterAction("Periodical cleanup"): |
| 248 | + var expireds: seq[Hash32] |
| 249 | + for key, seen in wire.seenTransactions: |
| 250 | + if getTime() - seen.lastSeen > POOLED_STORAGE_TIME_LIMIT: |
| 251 | + expireds.add key |
| 252 | + awaitQuota(wire, hashLookupCost, "broadcast transactions hashes") |
| 253 | + |
| 254 | + for expire in expireds: |
| 255 | + wire.seenTransactions.del(expire) |
| 256 | + awaitQuota(wire, hashLookupCost, "broadcast transactions hashes") |
| 257 | + |
| 258 | + if res == update: |
| 259 | + wire.reqisterAction("Periodical blockRangeUpdate"): |
| 260 | + let |
| 261 | + packet = BlockRangeUpdatePacket( |
| 262 | + earliest: 0, |
| 263 | + latest: wire.chain.latestNumber, |
| 264 | + latestHash: wire.chain.latestHash, |
| 265 | + ) |
| 266 | + |
| 267 | + for peer in wire.peers69OrLater: |
| 268 | + try: |
| 269 | + await peer.blockRangeUpdate(packet) |
| 270 | + except EthP2PError as exc: |
| 271 | + debug "broadcast block range update failed", |
| 272 | + msg=exc.msg |
| 273 | + awaitQuota(wire, blockRangeUpdateCost, "broadcast blockRangeUpdate") |
| 274 | + |
| 275 | +proc setupTokenBucket*(): TokenBucket = |
| 276 | + TokenBucket.new(maxOperationQuota.int, fullReplenishTime) |
| 277 | + |
| 278 | +proc actionLoop*(wire: EthWireRef) {.async: (raises: [CancelledError]).} = |
| 279 | + while true: |
| 280 | + let action = await wire.actionQueue.popFirst() |
| 281 | + await action() |
| 282 | + |
| 283 | +proc stop*(wire: EthWireRef) {.async: (raises: [CancelledError]).} = |
| 284 | + var waitedFutures = @[ |
| 285 | + wire.tickerHeartbeat.cancelAndWait(), |
| 286 | + wire.actionHeartbeat.cancelAndWait(), |
| 287 | + ] |
| 288 | + |
| 289 | + let |
| 290 | + timeout = chronos.seconds(5) |
| 291 | + completed = await withTimeout(allFutures(waitedFutures), timeout) |
| 292 | + if not completed: |
| 293 | + trace "Broadcast.stop(): timeout reached", timeout, |
| 294 | + futureErrors = waitedFutures.filterIt(it.error != nil).mapIt(it.error.msg) |
0 commit comments