Skip to content

Commit 7bbb0f4

Browse files
authored
Stream blocks during import (#2937)
When running the import, currently blocks are loaded in batches into a `seq` then passed to the importer as such. In reality, blocks are still processed one by one, so the batching does not offer any performance advantage. It does however require that the client wastes memory, up to several GB, on the block sequence while they're waiting to be processed. This PR introduces a persister that accepts these potentially large blocks one by one and at the same time removes a number of redundant / unnecessary copies, assignments and resets that were slowing down the import process in general.
1 parent 06a544a commit 7bbb0f4

File tree

14 files changed

+506
-474
lines changed

14 files changed

+506
-474
lines changed

fluffy/database/era1_db.nim

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,19 @@ proc new*(
5959
): Era1DB =
6060
Era1DB(path: path, network: network, accumulator: accumulator)
6161

62-
proc getEthBlock*(db: Era1DB, blockNumber: uint64): Result[Block, string] =
62+
proc getEthBlock*(
63+
db: Era1DB, blockNumber: uint64, res: var Block
64+
): Result[void, string] =
6365
let f = ?db.getEra1File(blockNumber.era)
6466

65-
f.getEthBlock(blockNumber)
67+
f.getEthBlock(blockNumber, res)
6668

67-
proc getBlockTuple*(db: Era1DB, blockNumber: uint64): Result[BlockTuple, string] =
69+
proc getBlockTuple*(
70+
db: Era1DB, blockNumber: uint64, res: var BlockTuple
71+
): Result[void, string] =
6872
let f = ?db.getEra1File(blockNumber.era)
6973

70-
f.getBlockTuple(blockNumber)
74+
f.getBlockTuple(blockNumber, res)
7175

7276
proc getAccumulator*(
7377
db: Era1DB, blockNumber: uint64

fluffy/eth_data/era1.nim

Lines changed: 55 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,10 @@ func offsetsLen(startNumber: uint64): int =
180180
proc toCompressedRlpBytes(item: auto): seq[byte] =
181181
snappy.encodeFramed(rlp.encode(item))
182182

183-
proc fromCompressedRlpBytes(bytes: openArray[byte], T: type): Result[T, string] =
183+
proc fromCompressedRlpBytes[T](bytes: openArray[byte], v: var T): Result[void, string] =
184184
try:
185-
ok(rlp.decode(decodeFramed(bytes, checkIntegrity = false), T))
185+
v = rlp.decode(decodeFramed(bytes, checkIntegrity = false), T)
186+
ok()
186187
except RlpError as e:
187188
err("Invalid compressed RLP data for " & $T & ": " & e.msg)
188189

@@ -300,32 +301,32 @@ proc skipRecord*(f: Era1File): Result[void, string] =
300301

301302
f[].handle.get().skipRecord()
302303

303-
proc getBlockHeader(f: Era1File): Result[headers.Header, string] =
304+
proc getBlockHeader(f: Era1File, res: var headers.Header): Result[void, string] =
304305
var bytes: seq[byte]
305306

306307
let header = ?f[].handle.get().readRecord(bytes)
307308
if header.typ != CompressedHeader:
308309
return err("Invalid era file: didn't find block header at index position")
309310

310-
fromCompressedRlpBytes(bytes, headers.Header)
311+
fromCompressedRlpBytes(bytes, res)
311312

312-
proc getBlockBody(f: Era1File): Result[BlockBody, string] =
313+
proc getBlockBody(f: Era1File, res: var BlockBody): Result[void, string] =
313314
var bytes: seq[byte]
314315

315316
let header = ?f[].handle.get().readRecord(bytes)
316317
if header.typ != CompressedBody:
317318
return err("Invalid era file: didn't find block body at index position")
318319

319-
fromCompressedRlpBytes(bytes, BlockBody)
320+
fromCompressedRlpBytes(bytes, res)
320321

321-
proc getReceipts(f: Era1File): Result[seq[Receipt], string] =
322+
proc getReceipts(f: Era1File, res: var seq[Receipt]): Result[void, string] =
322323
var bytes: seq[byte]
323324

324325
let header = ?f[].handle.get().readRecord(bytes)
325326
if header.typ != CompressedReceipts:
326327
return err("Invalid era file: didn't find receipts at index position")
327328

328-
fromCompressedRlpBytes(bytes, seq[Receipt])
329+
fromCompressedRlpBytes(bytes, res)
329330

330331
proc getTotalDifficulty(f: Era1File): Result[UInt256, string] =
331332
var bytes: seq[byte]
@@ -339,18 +340,25 @@ proc getTotalDifficulty(f: Era1File): Result[UInt256, string] =
339340

340341
ok(UInt256.fromBytesLE(bytes))
341342

342-
proc getNextEthBlock*(f: Era1File): Result[Block, string] =
343+
proc getNextEthBlock*(f: Era1File, res: var Block): Result[void, string] =
343344
doAssert not isNil(f) and f[].handle.isSome
344345

345-
var
346-
header = ?getBlockHeader(f)
347-
body = ?getBlockBody(f)
346+
var body: BlockBody
347+
?getBlockHeader(f, res.header)
348+
?getBlockBody(f, body)
349+
348350
?skipRecord(f) # receipts
349351
?skipRecord(f) # totalDifficulty
350352

351-
ok(Block.init(move(header), move(body)))
353+
res.transactions = move(body.transactions)
354+
res.uncles = move(body.uncles)
355+
res.withdrawals = move(body.withdrawals)
356+
357+
ok()
352358

353-
proc getEthBlock*(f: Era1File, blockNumber: uint64): Result[Block, string] =
359+
proc getEthBlock*(
360+
f: Era1File, blockNumber: uint64, res: var Block
361+
): Result[void, string] =
354362
doAssert not isNil(f) and f[].handle.isSome
355363
doAssert(
356364
blockNumber >= f[].blockIdx.startNumber and blockNumber <= f[].blockIdx.endNumber,
@@ -361,20 +369,21 @@ proc getEthBlock*(f: Era1File, blockNumber: uint64): Result[Block, string] =
361369

362370
?f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg)
363371

364-
getNextEthBlock(f)
372+
getNextEthBlock(f, res)
365373

366-
proc getNextBlockTuple*(f: Era1File): Result[BlockTuple, string] =
374+
proc getNextBlockTuple*(f: Era1File, res: var BlockTuple): Result[void, string] =
367375
doAssert not isNil(f) and f[].handle.isSome
368376

369-
let
370-
blockHeader = ?getBlockHeader(f)
371-
blockBody = ?getBlockBody(f)
372-
receipts = ?getReceipts(f)
373-
totalDifficulty = ?getTotalDifficulty(f)
377+
?getBlockHeader(f, res.header)
378+
?getBlockBody(f, res.body)
379+
?getReceipts(f, res.receipts)
380+
res.td = ?getTotalDifficulty(f)
374381

375-
ok((blockHeader, blockBody, receipts, totalDifficulty))
382+
ok()
376383

377-
proc getBlockTuple*(f: Era1File, blockNumber: uint64): Result[BlockTuple, string] =
384+
proc getBlockTuple*(
385+
f: Era1File, blockNumber: uint64, res: var BlockTuple
386+
): Result[void, string] =
378387
doAssert not isNil(f) and f[].handle.isSome
379388
doAssert(
380389
blockNumber >= f[].blockIdx.startNumber and blockNumber <= f[].blockIdx.endNumber,
@@ -385,9 +394,11 @@ proc getBlockTuple*(f: Era1File, blockNumber: uint64): Result[BlockTuple, string
385394

386395
?f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg)
387396

388-
getNextBlockTuple(f)
397+
getNextBlockTuple(f, res)
389398

390-
proc getBlockHeader*(f: Era1File, blockNumber: uint64): Result[headers.Header, string] =
399+
proc getBlockHeader*(
400+
f: Era1File, blockNumber: uint64, res: var headers.Header
401+
): Result[void, string] =
391402
doAssert not isNil(f) and f[].handle.isSome
392403
doAssert(
393404
blockNumber >= f[].blockIdx.startNumber and blockNumber <= f[].blockIdx.endNumber,
@@ -398,7 +409,7 @@ proc getBlockHeader*(f: Era1File, blockNumber: uint64): Result[headers.Header, s
398409

399410
?f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg)
400411

401-
getBlockHeader(f)
412+
getBlockHeader(f, res)
402413

403414
proc getTotalDifficulty*(f: Era1File, blockNumber: uint64): Result[UInt256, string] =
404415
doAssert not isNil(f) and f[].handle.isSome
@@ -445,13 +456,13 @@ proc buildAccumulator*(f: Era1File): Result[EpochRecordCached, string] =
445456
endNumber = f.blockIdx.endNumber()
446457

447458
var headerRecords: seq[HeaderRecord]
459+
var header: headers.Header
448460
for blockNumber in startNumber .. endNumber:
449-
let
450-
blockHeader = ?f.getBlockHeader(blockNumber)
451-
totalDifficulty = ?f.getTotalDifficulty(blockNumber)
461+
?f.getBlockHeader(blockNumber, header)
462+
let totalDifficulty = ?f.getTotalDifficulty(blockNumber)
452463

453464
headerRecords.add(
454-
HeaderRecord(blockHash: blockHeader.rlpHash(), totalDifficulty: totalDifficulty)
465+
HeaderRecord(blockHash: header.rlpHash(), totalDifficulty: totalDifficulty)
455466
)
456467

457468
ok(EpochRecordCached.init(headerRecords))
@@ -462,25 +473,26 @@ proc verify*(f: Era1File): Result[Digest, string] =
462473
endNumber = f.blockIdx.endNumber()
463474

464475
var headerRecords: seq[HeaderRecord]
476+
var blockTuple: BlockTuple
465477
for blockNumber in startNumber .. endNumber:
478+
?f.getBlockTuple(blockNumber, blockTuple)
466479
let
467-
(blockHeader, blockBody, receipts, totalDifficulty) =
468-
?f.getBlockTuple(blockNumber)
469-
470-
txRoot = calcTxRoot(blockBody.transactions)
471-
ommershHash = rlpHash(blockBody.uncles)
480+
txRoot = calcTxRoot(blockTuple.body.transactions)
481+
ommershHash = rlpHash(blockTuple.body.uncles)
472482

473-
if blockHeader.txRoot != txRoot:
483+
if blockTuple.header.txRoot != txRoot:
474484
return err("Invalid transactions root")
475485

476-
if blockHeader.ommersHash != ommershHash:
486+
if blockTuple.header.ommersHash != ommershHash:
477487
return err("Invalid ommers hash")
478488

479-
if blockHeader.receiptsRoot != calcReceiptsRoot(receipts):
489+
if blockTuple.header.receiptsRoot != calcReceiptsRoot(blockTuple.receipts):
480490
return err("Invalid receipts root")
481491

482492
headerRecords.add(
483-
HeaderRecord(blockHash: blockHeader.rlpHash(), totalDifficulty: totalDifficulty)
493+
HeaderRecord(
494+
blockHash: blockTuple.header.rlpHash(), totalDifficulty: blockTuple.td
495+
)
484496
)
485497

486498
let expectedRoot = ?f.getAccumulatorRoot()
@@ -496,17 +508,17 @@ iterator era1BlockHeaders*(f: Era1File): headers.Header =
496508
startNumber = f.blockIdx.startNumber
497509
endNumber = f.blockIdx.endNumber()
498510

511+
var header: headers.Header
499512
for blockNumber in startNumber .. endNumber:
500-
let header = f.getBlockHeader(blockNumber).valueOr:
501-
raiseAssert("Failed to read block header: " & error)
513+
f.getBlockHeader(blockNumber, header).expect("Header can be read")
502514
yield header
503515

504516
iterator era1BlockTuples*(f: Era1File): BlockTuple =
505517
let
506518
startNumber = f.blockIdx.startNumber
507519
endNumber = f.blockIdx.endNumber()
508520

521+
var blockTuple: BlockTuple
509522
for blockNumber in startNumber .. endNumber:
510-
let blockTuple = f.getBlockTuple(blockNumber).valueOr:
511-
raiseAssert("Failed to read block tuple: " & error)
523+
f.getBlockTuple(blockNumber, blockTuple).expect("Block tuple can be read")
512524
yield blockTuple

fluffy/tools/portal_bridge/portal_bridge_history.nim

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -374,14 +374,15 @@ proc runBackfillLoopAuditMode(
374374
rng = newRng()
375375
db = Era1DB.new(era1Dir, "mainnet", loadAccumulator())
376376

377+
var blockTuple: BlockTuple
377378
while true:
378379
let
379380
# Grab a random blockNumber to audit and potentially gossip
380381
blockNumber = rng[].rand(network_metadata.mergeBlockNumber - 1).uint64
381-
(header, body, receipts, _) = db.getBlockTuple(blockNumber).valueOr:
382-
error "Failed to get block tuple", error, blockNumber
383-
continue
384-
blockHash = header.rlpHash()
382+
db.getBlockTuple(blockNumber, blockTuple).isOkOr:
383+
error "Failed to get block tuple", error, blockNumber
384+
continue
385+
let blockHash = blockTuple.header.rlpHash()
385386

386387
var headerSuccess, bodySuccess, receiptsSuccess = false
387388

@@ -441,7 +442,7 @@ proc runBackfillLoopAuditMode(
441442
error "Invalid hex for block body content", error = e.msg
442443
break bodyBlock
443444

444-
validateBlockBodyBytes(content, header).isOkOr:
445+
validateBlockBodyBytes(content, blockTuple.header).isOkOr:
445446
error "Block body is invalid", error
446447
break bodyBlock
447448

@@ -469,7 +470,7 @@ proc runBackfillLoopAuditMode(
469470
error "Invalid hex for block receipts content", error = e.msg
470471
break receiptsBlock
471472

472-
validateReceiptsBytes(content, header.receiptsRoot).isOkOr:
473+
validateReceiptsBytes(content, blockTuple.header.receiptsRoot).isOkOr:
473474
error "Block receipts are invalid", error
474475
break receiptsBlock
475476

@@ -481,17 +482,21 @@ proc runBackfillLoopAuditMode(
481482
let
482483
epochRecord = db.getAccumulator(blockNumber).valueOr:
483484
raiseAssert "Failed to get accumulator from EraDB: " & error
484-
headerWithProof = buildHeaderWithProof(header, epochRecord).valueOr:
485+
headerWithProof = buildHeaderWithProof(blockTuple.header, epochRecord).valueOr:
485486
raiseAssert "Failed to build header with proof: " & error
486487

487488
# gossip block header by hash
488489
await bridge.gossipBlockHeader(blockHash, headerWithProof)
489490
# gossip block header by number
490491
await bridge.gossipBlockHeader(blockNumber, headerWithProof)
491492
if not bodySuccess:
492-
await bridge.gossipBlockBody(blockHash, PortalBlockBodyLegacy.fromBlockBody(body))
493+
await bridge.gossipBlockBody(
494+
blockHash, PortalBlockBodyLegacy.fromBlockBody(blockTuple.body)
495+
)
493496
if not receiptsSuccess:
494-
await bridge.gossipReceipts(blockHash, PortalReceipts.fromReceipts(receipts))
497+
await bridge.gossipReceipts(
498+
blockHash, PortalReceipts.fromReceipts(blockTuple.receipts)
499+
)
495500

496501
await sleepAsync(2.seconds)
497502

hive_integration/nodocker/engine/node.nim

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ proc setBlock*(c: ChainRef; blk: Block): Result[void, string] =
9696
let
9797
vmState = c.getVmState(header).valueOr:
9898
return err("no vmstate")
99-
_ = vmState.parent.stateRoot # Check point
10099
? vmState.processBlock(blk)
101100

102101
? c.db.persistHeaderAndSetHead(header, c.com.startOfHistory)

hive_integration/nodocker/rpc/rpc_sim.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Nimbus
2-
# Copyright (c) 2021 Status Research & Development GmbH
2+
# Copyright (c) 2021-2024 Status Research & Development GmbH
33
# Licensed under either of
44
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
55
# * MIT license ([LICENSE-MIT](LICENSE-MIT))

0 commit comments

Comments
 (0)