[ETCM-105] State sync improvements#715
Conversation
6fc329e to
542a06a
Compare
1c56ac7 to
8090650
Compare
8090650 to
e487563
Compare
| val etcPeerManager: ActorRef, | ||
| val syncConfig: SyncConfig, | ||
| implicit val scheduler: Scheduler) | ||
| val fastSyncStateStorage: FastSyncStateStorage, |
| context become syncingHandler.receive | ||
| if (syncState.isBlockchainWorkFinished && !syncState.stateSyncFinished) { | ||
| // chain has already been downloaded we can start state sync | ||
| syncingHandler.startStateSync(syncState.targetBlock) |
There was a problem hiding this comment.
Maybe some log here will be helpful?
| if (blockchainDataToDownload) | ||
| processDownloads() | ||
| else if (syncState.isBlockchainWorkFinished && !syncState.stateSyncFinished) { | ||
| // TODO we are waiting for state sync to finish |
There was a problem hiding this comment.
When this TODO will be addressed?
There was a problem hiding this comment.
i will add ticket number there. My plan was to address this in ETCM-103, as there i will add monitoring for stale target block, and will probably know more how exactly this synicng loop should look like in FastSync
| nextBlockToFullyValidate: BigInt = 1, | ||
| targetBlockUpdateFailures: Int = 0, | ||
| updatingTargetBlock: Boolean = false) { | ||
| targetBlock: BlockHeader, |
| * If it would valuable, it possible to implement processor which would gather statistics about duplicated or not requested data. | ||
| */ | ||
| def processResponses(state: SchedulerState, responses: List[SyncResponse]): Either[CriticalError, SchedulerState] = { | ||
| def go(currentState: SchedulerState, remaining: Seq[SyncResponse]): Either[CriticalError, SchedulerState] = { |
| requestType match { | ||
| case SyncStateScheduler.StateNode => | ||
| import io.iohk.ethereum.network.p2p.messages.PV63.AccountImplicits._ | ||
| scala.util.Try(n.value.toArray[Byte].toAccount).toEither.left.map(_ => NotAccountLeafNode).map { account => |
There was a problem hiding this comment.
We could add some decoder from RLPEncodable to Account and use n.parsedRlp instead of using Try
There was a problem hiding this comment.
I will add custom apply method for account to do that, to not expose all this details here.
There was a problem hiding this comment.
I had in mind that we could use already parsed LeafNote to avoid unnecessary decoding from bytes
| } | ||
|
|
||
| private def isRequestAlreadyKnown(state: SchedulerState, req: StateNodeRequest): Boolean = { | ||
| if (state.memBatch.contains(req.nodeHash)) { |
There was a problem hiding this comment.
Simpler: private def isRequestAlreadyKnown(state: SchedulerState, req: StateNodeRequest): Boolean = state.memBatch.contains(req.nodeHash) || isInDatabase(req)
|
|
||
| private val stateNodeRequestComparator = new Comparator[StateNodeRequest] { | ||
| override def compare(o1: StateNodeRequest, o2: StateNodeRequest): Int = { | ||
| if (o1.nodeDepth > o2.nodeDepth) { |
There was a problem hiding this comment.
Simpler o2.nodeDepth compare o1.nodeDepth
Add tailrec annotation Simplify known node check Fix formatting in all files
| val (newRequests, newState) = | ||
| currentState.assignTasksToPeers( | ||
| NonEmptyList.fromListUnsafe(freePeers.toList), | ||
| Some(newNodesToGet), | ||
| syncConfig.nodesPerRequest | ||
| ) | ||
| log.info( | ||
| "Creating {} new state node requests. Current request queue size is {}", | ||
| newRequests.size, | ||
| newState.nodesToGet.size | ||
| ) | ||
| newRequests.foreach { request => | ||
| requestNodes(request) | ||
| } | ||
| context.become(downloading(scheduler, newState)) |
There was a problem hiding this comment.
It could be extracted to separate a method. It is the same in both cases
There was a problem hiding this comment.
aahh you are right we probably do not need check for peers message at all
| if (nextRequested == receivedHash) { | ||
| go(requestedRemaining.tail, receivedRemaining.tail, SyncResponse(receivedHash, nextReceived) :: processed) | ||
| } else { | ||
| // hash of next element does not match return what what we have processed, and remaing hashes to get |
| } else { | ||
| val (notReceived, received) = process(requestedHashes.toList, receivedMessage.values.toList) | ||
| if (received.isEmpty) { | ||
| val rescheduleRequestedHashes = notReceived.foldLeft(nodesToGet) { case (map, hash) => |
There was a problem hiding this comment.
Simpler (?): nodesToGet ++ notReceived.map(_ -> None)
There was a problem hiding this comment.
but at least two traversals of notReceived collection, one for map one for addition to map. I would leave it as it is.
| // so we can ignore those errors. | ||
| sync.processResponses(currentState, nodes) match { | ||
| case Left(value) => | ||
| log.info(s"Critical error while state syncing ${value}, stopping state sync") |
| if (parentsToCheck.isEmpty) { | ||
| (currentRequests, currentBatch) | ||
| } else { | ||
| val parent = parentsToCheck.head |
There was a problem hiding this comment.
WDYT about adding some meaningful exception here? eg. val parent = parentsToCheck.headOption.getOrElse(throw new IllegalStateException("Critical exception. Cannot find parent"))
There was a problem hiding this comment.
it sound like good idea, it will be instantly known that some invariants have been broken
ntallar
left a comment
There was a problem hiding this comment.
Very minor comments, will continue reviewing tomorrow
| # During fast-sync when most up to date block is determined from peers, the actual target block number | ||
| # will be decreased by this value | ||
| target-block-offset = 128 | ||
| target-block-offset = 32 |
There was a problem hiding this comment.
I assume this value was taken from geth, right?
There was a problem hiding this comment.
it is part of my experiments with fast sync. It seems geth tries to have offset equal to 64 blocks. But their sync is much faster and they process a lot more nodes before updating to new target.
128 is definitly to much, as large part of the peers keeps only 128 blocks history, so it can happen that they won;t have target block root and our sync will not even start.
| } | ||
|
|
||
| def getMissingHashes(max: Int): (List[ByteString], SchedulerState) = { | ||
| def go( |
| /** | ||
| * Default responses processor which ignores duplicated or not requested hashes, but informs the caller about critical | ||
| * errors. | ||
| * If it would valuable, it possible to implement processor which would gather statistics about duplicated or not requested data. |
Add statistics logging Remove unnecessary CheckPeers Messages from downloader
| override def run(sender: ActorRef, msg: Any): AutoPilot = { | ||
| msg match { | ||
| case SendMessage(msg, peer) if msg.underlyingMsg.isInstanceOf[GetNodeData] => | ||
| val msgToGet = msg.underlyingMsg.asInstanceOf[GetNodeData] |
| val (scheduler, schedulerBlockchain, schedulerDb) = buildScheduler() | ||
| val header = Fixtures.Blocks.ValidBlock.header.copy(stateRoot = worldHash, number = 1) | ||
| schedulerBlockchain.storeBlockHeader(header).commit() | ||
| var state = scheduler.initState(worldHash).get |
There was a problem hiding this comment.
maybe i am missing something, but this is not necessary fold, as we do not have any collection to summarise into one value, but need to process some stuff until some condition hold
There was a problem hiding this comment.
Ouch, I missed the fact that condition is being checked on different state with each loop, which makes it much elaborate to express purely than while loop.
| } | ||
| } | ||
|
|
||
| private def isRequestAlreadyKnown(state: SchedulerState, req: StateNodeRequest): Boolean = { |
There was a problem hiding this comment.
Isn't this more of isResponseAlreadyKnown? To difference with isRequestAlreadyKnownOrResolved
| case n: BranchNode => | ||
| Right(n.children.collect { case HashNode(childHash) => | ||
| StateNodeRequest( | ||
| ByteString.fromArrayUnsafe(childHash), |
There was a problem hiding this comment.
I assume the ByteString.fromArrayUnsafe is here for performance reasons, right?
There was a problem hiding this comment.
Yup, it can be used when we known that provided array won't be mutated. (as in this case)
Refactor FastSyncIt tests Properly close actor system in StateSyncSpec
mmrozek
left a comment
There was a problem hiding this comment.
Minor comments only. If it syncs with the mainnet it is ready to merge
| } | ||
|
|
||
| private def isRequestedHashAlreadyCommitted(state: SchedulerState, req: StateNodeRequest): Boolean = { | ||
| // TODO add bloom filter step before data base to speed things up. Bloomfilter will need to be reloaded after node |
There was a problem hiding this comment.
Minor: please add JIRA ticket to the comment
|
|
||
| case object AlreadyProcessedItem extends NotCriticalError | ||
|
|
||
| final case class ProcessingStatistics(duplicatedHashes: Long, notRequestedHashes: Long, saved: Long) { |
There was a problem hiding this comment.
Do we want to expose ProcessingStatistics as a metric?
There was a problem hiding this comment.
it is possible, we can even save it to do to save stats between shutdowns. I will probably do this when sync will be ready for its prime time(i.e it will work with mainnet)
| with BeforeAndAfterAll | ||
| with ScalaCheckPropertyChecks { | ||
|
|
||
| override def afterAll(): Unit = { |
There was a problem hiding this comment.
Minor: You could use WithActorSystemShutDown trait
| with Matchers | ||
| with BeforeAndAfterAll { | ||
|
|
||
| override def afterAll(): Unit = { |
| Task.raiseError(new TimeoutException("Task time out after all retries")) | ||
| } | ||
| } | ||
| it should "should update target block and sync this new target block state" in customTestCaseResourceM( |
There was a problem hiding this comment.
Minor: pivot instead of target
kapke
left a comment
There was a problem hiding this comment.
Minor stuff only. LGTM if it syncs with mainnet (I can test Mordor over the weekend if you want to)
| newNodes: Option[Seq[ByteString]], | ||
| nodesPerPeerCapacity: Int | ||
| ): (Seq[PeerRequest], DownloaderState) = { | ||
| def go( |
| val (scheduler, schedulerBlockchain, schedulerDb) = buildScheduler() | ||
| val header = Fixtures.Blocks.ValidBlock.header.copy(stateRoot = worldHash, number = 1) | ||
| schedulerBlockchain.storeBlockHeader(header).commit() | ||
| var state = scheduler.initState(worldHash).get |
There was a problem hiding this comment.
Ouch, I missed the fact that condition is being checked on different state with each loop, which makes it much elaborate to express purely than while loop.
| import org.scalatest.matchers.must.Matchers | ||
| import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks | ||
|
|
||
| class SyncSchedulerSpec extends AnyFlatSpec with Matchers with EitherValues with ScalaCheckPropertyChecks { |
There was a problem hiding this comment.
Minor - names are out of sync (SyncSchedulerSpec, SyncSchedulerState, SyncStateScheduler)
| val goodResponse = peerRequest.nodes.toList.take(perPeerCapacity / 2).map(h => hashNodeMap(h)) | ||
| val badResponse = (200 until 210).map(ByteString(_)).toList | ||
| val (result, newState2) = newState1.handleRequestSuccess(requests(0).peer, NodeData(goodResponse ++ badResponse)) | ||
| assert(result.isInstanceOf[UsefulData]) |
|
@kapke It would be great if you try mordor sync. As for mainnet, syncing with it will be possible after next ticker in line
|
Description
PR which makes all grunt work necessary for implementing fast sync restarting at arbitrary new target block. Notable changes:
FastSycactor and split it into two separate componentsStateSyncSchedulerwhich traverses mpt trie in dfs fashion and create requests for currently missing nodes,StateSyncDownloaderwhich retrieves those nodes from remote peers and provides them toSchedulersyncStateStorageActor. This process was highly indeterministic and probably could lose data (i did not have any test case for it, but it easy to imagine situation when node is killed during node processing with not correctly updated queues).With this pr, after restart state sync start from scratch i.e from already known target block, but it does not request nodes which are already saved in database.
The order of traversal here is important, as the fact that we have node at level
nimplies that we have all subtries at deeper levels, so ultimatly we need to traverse only unknown paths from root.Whole solution is havily influenced by the way how
Gethis handling state sync in FastSyncFuture Tasks
StateSyncSchedulerto request restart, waiting for downloader to finish or its download tasks.MissingNodesmessages and do the processing in background. Then when the first messge withdownloaderCapacity> 0 arrives, scheduler would send missing nodes in between processing ofMissingNodesmessage.TODO