Skip to content

Commit 6ce4568

Browse files
authored
Query and store blob transactions into txpool (#3375)
* Query and store blob transactions into txpool Also a breakdown of transaction broadcast. But this time it's only handle incoming transactions, both regular and blob transactions. No rebroadcast whatsoever. It is a good change to let this incoming transactions handling become stable before we add the real broadcast things. * Additional validation: compare announced vs received stat
1 parent 64d14b3 commit 6ce4568

File tree

11 files changed

+354
-34
lines changed

11 files changed

+354
-34
lines changed

execution_chain/core/tx_pool.nim

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ export
6363
tx, # : Transaction
6464
pooledTx, # : PooledTransaction
6565
id, # : Hash32
66-
sender, # : Address
67-
nextFork # : EVMFork
66+
sender # : Address
6867

6968
# ------------------------------------------------------------------------------
7069
# TxPoolRef constructor
@@ -95,13 +94,15 @@ export
9594
export
9695
addTx,
9796
getItem,
97+
contains,
9898
removeTx,
9999
removeExpiredTxs,
100100
getBlobAndProofV1,
101101
getBlobAndProofV2
102102

103103
# addTx(xp: TxPoolRef, ptx: PooledTransaction): Result[void, TxError]
104104
# addTx(xp: TxPoolRef, tx: Transaction): Result[void, TxError]
105+
# contains(xp: TxPoolRef, id: Hash32): bool
105106
# getItem(xp: TxPoolRef, id: Hash32): Result[TxItemRef, TxError]
106107
# removeTx(xp: TxPoolRef, id: Hash32)
107108
# removeExpiredTxs(xp: TxPoolRef, lifeTime: Duration)

execution_chain/core/tx_pool/tx_desc.nim

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,6 @@ func baseFee(xp: TxPoolRef): GasInt =
162162
else:
163163
0.GasInt
164164

165-
func gasLimit(xp: TxPoolRef): GasInt =
166-
xp.vmState.blockCtx.gasLimit
167-
168165
func excessBlobGas(xp: TxPoolRef): GasInt =
169166
xp.vmState.blockCtx.excessBlobGas
170167

@@ -312,6 +309,8 @@ proc updateVmState*(xp: TxPoolRef) =
312309
# ------------------------------------------------------------------------------
313310
# Public functions
314311
# ------------------------------------------------------------------------------
312+
proc contains*(xp: TxPoolRef, id: Hash32): bool =
313+
xp.idTab.hasKey(id)
315314

316315
proc getItem*(xp: TxPoolRef, id: Hash32): Result[TxItemRef, TxError] =
317316
let item = xp.idTab.getOrDefault(id)

execution_chain/nimbus_desc.nim

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import
1818
./core/tx_pool,
1919
./sync/peers,
2020
./sync/beacon as beacon_sync,
21+
./sync/wire_protocol,
2122
./beacon/beacon_engine,
2223
./common,
2324
./config
@@ -52,6 +53,7 @@ type
5253
beaconSyncRef*: BeaconSyncRef
5354
beaconEngine*: BeaconEngineRef
5455
metricsServer*: MetricsHttpServerRef
56+
wire*: EthWireRef
5557

5658
{.push gcsafe, raises: [].}
5759

@@ -70,6 +72,8 @@ proc stop*(nimbus: NimbusNode, conf: NimbusConf) {.async, gcsafe.} =
7072
waitedFutures.add nimbus.beaconSyncRef.stop()
7173
if nimbus.metricsServer.isNil.not:
7274
waitedFutures.add nimbus.metricsServer.stop()
75+
if nimbus.wire.isNil.not:
76+
waitedFutures.add nimbus.wire.stop()
7377

7478
waitedFutures.add nimbus.fc.stopProcessingQueue()
7579

execution_chain/nimbus_execution_client.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
114114
rng = nimbus.ctx.rng)
115115

116116
# Add protocol capabilities
117-
nimbus.ethNode.addEthHandlerCapability(nimbus.txPool)
117+
nimbus.wire = nimbus.ethNode.addEthHandlerCapability(nimbus.txPool)
118118

119119
# Always initialise beacon syncer
120120
nimbus.beaconSyncRef = BeaconSyncRef.init(

execution_chain/sync/wire_protocol.nim

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010
import
1111
./wire_protocol/requester,
1212
./wire_protocol/responder,
13+
./wire_protocol/broadcast,
1314
./wire_protocol/types,
1415
./wire_protocol/setup
1516

1617
export
1718
requester,
1819
responder,
20+
broadcast,
1921
types,
2022
setup
Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
# nimbus-execution-client
2+
# Copyright (c) 2025 Status Research & Development GmbH
3+
# Licensed under either of
4+
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
5+
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
6+
# at your option.
7+
# This file may not be copied, modified, or distributed except according to
8+
# those terms.
9+
10+
import
11+
std/[tables, sets, times, sequtils, random],
12+
chronos,
13+
chronos/ratelimit,
14+
chronicles,
15+
eth/common/hashes,
16+
eth/common/times,
17+
results,
18+
./types,
19+
./requester,
20+
../../networking/p2p,
21+
../../core/tx_pool,
22+
../../core/pooled_txs_rlp,
23+
../../core/eip4844,
24+
../../core/eip7594,
25+
../../core/chain/forked_chain
26+
27+
logScope:
28+
topics = "tx-broadcast"
29+
30+
const
31+
maxOperationQuota = 1000000
32+
fullReplenishTime = chronos.seconds(5)
33+
POOLED_STORAGE_TIME_LIMIT = initDuration(minutes = 20)
34+
cleanupTicker = chronos.minutes(5)
35+
# https://github.com/ethereum/devp2p/blob/b0c213de97978053a0f62c3ea4d23c0a3d8784bc/caps/eth.md#blockrangeupdate-0x11
36+
blockRangeUpdateTicker = chronos.minutes(2)
37+
SOFT_RESPONSE_LIMIT* = 2 * 1024 * 1024
38+
39+
template awaitQuota(bcParam: EthWireRef, costParam: float, protocolIdParam: string) =
40+
let
41+
wire = bcParam
42+
cost = int(costParam)
43+
protocolId = protocolIdParam
44+
45+
try:
46+
if not wire.quota.tryConsume(cost):
47+
debug "Awaiting broadcast quota", cost = cost, protocolId = protocolId
48+
await wire.quota.consume(cost)
49+
except CancelledError as exc:
50+
raise exc
51+
except CatchableError as exc:
52+
debug "Error while waiting broadcast quota",
53+
cost = cost, protocolId = protocolId, msg = exc.msg
54+
55+
template reqisterAction(wire: EthWireRef, actionDesc: string, body) =
56+
block:
57+
proc actionHandler(): Future[void] {.async: (raises: [CancelledError]).} =
58+
debug "Invoking broadcast action", desc=actionDesc
59+
body
60+
61+
await wire.actionQueue.addLast(actionHandler)
62+
63+
func allowedOpsPerSecondCost(n: int): float =
64+
const replenishRate = (maxOperationQuota / fullReplenishTime.nanoseconds.float)
65+
(replenishRate * 1000000000'f / n.float)
66+
67+
const
68+
txPoolProcessCost = allowedOpsPerSecondCost(1000)
69+
hashLookupCost = allowedOpsPerSecondCost(2000)
70+
blockRangeUpdateCost = allowedOpsPerSecondCost(20)
71+
72+
iterator peers69OrLater(wire: EthWireRef, random: bool = false): Peer =
73+
var peers = newSeqOfCap[Peer](wire.node.numPeers)
74+
for peer in wire.node.peers:
75+
if peer.isNil:
76+
continue
77+
if peer.supports(eth69):
78+
peers.add peer
79+
if random:
80+
shuffle(peers)
81+
for peer in peers:
82+
if peer.connectionState != ConnectionState.Connected:
83+
continue
84+
yield peer
85+
86+
proc syncerRunning*(wire: EthWireRef): bool =
87+
# Disable transactions gossip and processing when
88+
# the syncer is still busy
89+
const
90+
thresholdTime = 3 * 15
91+
92+
let
93+
nowTime = EthTime.now()
94+
headerTime = wire.chain.latestHeader.timestamp
95+
96+
let running = (nowTime - headerTime) > thresholdTime
97+
if running != not wire.gossipEnabled:
98+
wire.gossipEnabled = not running
99+
notice "Transaction broadcast state changed", enabled = wire.gossipEnabled
100+
101+
running
102+
103+
proc handleTransactionsBroadcast*(wire: EthWireRef,
104+
packet: TransactionsPacket,
105+
peer: Peer) {.async: (raises: [CancelledError]).} =
106+
if wire.syncerRunning:
107+
return
108+
109+
if packet.transactions.len == 0:
110+
return
111+
112+
debug "received new transactions",
113+
number = packet.transactions.len
114+
115+
wire.reqisterAction("TxPool consume incoming transactions"):
116+
for tx in packet.transactions:
117+
if tx.txType == TxEip4844:
118+
# Disallow blob transaction broadcast
119+
debug "Protocol Breach: Peer broadcast blob transaction",
120+
remote=peer.remote, clientId=peer.clientId
121+
await peer.disconnect(BreachOfProtocol)
122+
return
123+
124+
wire.txPool.addTx(tx).isOkOr:
125+
continue
126+
127+
awaitQuota(wire, txPoolProcessCost, "adding into txpool")
128+
129+
proc handleTxHashesBroadcast*(wire: EthWireRef,
130+
packet: NewPooledTransactionHashesPacket,
131+
peer: Peer) {.async: (raises: [CancelledError]).} =
132+
if wire.syncerRunning:
133+
return
134+
135+
if packet.txHashes.len == 0:
136+
return
137+
138+
debug "received new pooled tx hashes",
139+
hashes = packet.txHashes.len
140+
141+
if packet.txHashes.len != packet.txSizes.len or
142+
packet.txHashes.len != packet.txTypes.len:
143+
debug "Protocol Breach: new pooled tx hashes invalid params",
144+
hashes = packet.txHashes.len,
145+
sizes = packet.txSizes.len,
146+
types = packet.txTypes.len
147+
await peer.disconnect(BreachOfProtocol)
148+
return
149+
150+
wire.reqisterAction("Handle broadcast transactions hashes"):
151+
var
152+
i = 0
153+
154+
while i < packet.txHashes.len:
155+
var
156+
msg: PooledTransactionsRequest
157+
res: Opt[PooledTransactionsPacket]
158+
sizes: seq[uint64]
159+
types: seq[byte]
160+
sumSize = 0'u64
161+
162+
while i < packet.txHashes.len:
163+
let size = packet.txSizes[i]
164+
if sumSize + size > SOFT_RESPONSE_LIMIT.uint64:
165+
break
166+
167+
let txHash = packet.txHashes[i]
168+
if txHash notin wire.txPool:
169+
msg.txHashes.add txHash
170+
sumSize += size
171+
sizes.add size
172+
types.add packet.txTypes[i]
173+
174+
awaitQuota(wire, hashLookupCost, "check transaction exists in pool")
175+
inc i
176+
177+
try:
178+
res = await peer.getPooledTransactions(msg)
179+
except EthP2PError as exc:
180+
debug "request pooled transactions failed",
181+
msg=exc.msg
182+
183+
if res.isNone:
184+
debug "request pooled transactions get nothing"
185+
return
186+
187+
let
188+
ptx = res.get()
189+
190+
for i, tx in ptx.transactions:
191+
# If we receive any blob transactions missing sidecars, or with
192+
# sidecars that don't correspond to the versioned hashes reported
193+
# in the header, disconnect from the sending peer.
194+
if tx.tx.txType.byte != types[i]:
195+
debug "Protocol Breach: Received transaction with type differ from announced",
196+
remote=peer.remote, clientId=peer.clientId
197+
await peer.disconnect(BreachOfProtocol)
198+
return
199+
200+
let (size, hash) = getEncodedLengthAndHash(tx)
201+
if size.uint64 != sizes[i]:
202+
debug "Protocol Breach: Received transaction with size differ from announced",
203+
remote=peer.remote, clientId=peer.clientId
204+
await peer.disconnect(BreachOfProtocol)
205+
return
206+
207+
if hash != msg.txHashes[i]:
208+
debug "Protocol Breach: Received transaction with hash differ from announced",
209+
remote=peer.remote, clientId=peer.clientId
210+
await peer.disconnect(BreachOfProtocol)
211+
return
212+
213+
if tx.tx.txType == TxEip4844:
214+
if tx.blobsBundle.isNil:
215+
debug "Protocol Breach: Received sidecar-less blob transaction",
216+
remote=peer.remote, clientId=peer.clientId
217+
await peer.disconnect(BreachOfProtocol)
218+
return
219+
220+
if tx.blobsBundle.wrapperVersion == WrapperVersionEIP4844:
221+
validateBlobTransactionWrapper4844(tx).isOkOr:
222+
debug "Protocol Breach: Sidecar validation error", msg=error,
223+
remote=peer.remote, clientId=peer.clientId
224+
await peer.disconnect(BreachOfProtocol)
225+
return
226+
227+
if tx.blobsBundle.wrapperVersion == WrapperVersionEIP7594:
228+
validateBlobTransactionWrapper7594(tx).isOkOr:
229+
debug "Protocol Breach: Sidecar validation error", msg=error,
230+
remote=peer.remote, clientId=peer.clientId
231+
await peer.disconnect(BreachOfProtocol)
232+
return
233+
234+
wire.txPool.addTx(tx).isOkOr:
235+
continue
236+
237+
awaitQuota(wire, txPoolProcessCost, "broadcast transactions hashes")
238+
239+
proc tickerLoop*(wire: EthWireRef) {.async: (raises: [CancelledError]).} =
240+
while true:
241+
let
242+
cleanup = sleepAsync(cleanupTicker)
243+
update = sleepAsync(blockRangeUpdateTicker)
244+
res = await one(cleanup, update)
245+
246+
if res == cleanup:
247+
wire.reqisterAction("Periodical cleanup"):
248+
var expireds: seq[Hash32]
249+
for key, seen in wire.seenTransactions:
250+
if getTime() - seen.lastSeen > POOLED_STORAGE_TIME_LIMIT:
251+
expireds.add key
252+
awaitQuota(wire, hashLookupCost, "broadcast transactions hashes")
253+
254+
for expire in expireds:
255+
wire.seenTransactions.del(expire)
256+
awaitQuota(wire, hashLookupCost, "broadcast transactions hashes")
257+
258+
if res == update:
259+
wire.reqisterAction("Periodical blockRangeUpdate"):
260+
let
261+
packet = BlockRangeUpdatePacket(
262+
earliest: 0,
263+
latest: wire.chain.latestNumber,
264+
latestHash: wire.chain.latestHash,
265+
)
266+
267+
for peer in wire.peers69OrLater:
268+
try:
269+
await peer.blockRangeUpdate(packet)
270+
except EthP2PError as exc:
271+
debug "broadcast block range update failed",
272+
msg=exc.msg
273+
awaitQuota(wire, blockRangeUpdateCost, "broadcast blockRangeUpdate")
274+
275+
proc setupTokenBucket*(): TokenBucket =
276+
TokenBucket.new(maxOperationQuota.int, fullReplenishTime)
277+
278+
proc actionLoop*(wire: EthWireRef) {.async: (raises: [CancelledError]).} =
279+
while true:
280+
let action = await wire.actionQueue.popFirst()
281+
await action()
282+
283+
proc stop*(wire: EthWireRef) {.async: (raises: [CancelledError]).} =
284+
var waitedFutures = @[
285+
wire.tickerHeartbeat.cancelAndWait(),
286+
wire.actionHeartbeat.cancelAndWait(),
287+
]
288+
289+
let
290+
timeout = chronos.seconds(5)
291+
completed = await withTimeout(allFutures(waitedFutures), timeout)
292+
if not completed:
293+
trace "Broadcast.stop(): timeout reached", timeout,
294+
futureErrors = waitedFutures.filterIt(it.error != nil).mapIt(it.error.msg)

0 commit comments

Comments
 (0)