[ETCM-275] Async node processing and downloader removal#759
[ETCM-275] Async node processing and downloader removal#759KonradStaniec merged 14 commits intodevelopfrom
Conversation
5ace25f to
b2180b3
Compare
d3f4e56 to
632ba50
Compare
dbb28a8 to
9151899
Compare
9f3e3ea to
389f71a
Compare
| if (underlyingMessage.maxHeaders == 1) { | ||
| // pivot block | ||
| sender ! MessageFromPeer(BlockHeaders(Seq(pivotHeader)), peer) | ||
| this |
There was a problem hiding this comment.
Minor: You don't mutate state, so you could move this after the if statement. It will simplify a logic a little bit
| } | ||
| import akka.pattern.pipe | ||
|
|
||
| // scalastyle:off |
| case class RequestFailed(from: Peer, reason: String) extends RequestResult | ||
|
|
||
| sealed trait ProcessingError | ||
| case class Critical(er: CriticalError) extends ProcessingError |
There was a problem hiding this comment.
What is the difference between Critical and DownloaderError with critical = true?
There was a problem hiding this comment.
Criticial stop the sync entierly as the trie is malformed for some reason, and DownloaderError wihth critical only blacklist peer. I will change the naming here as it can be confusing
kapke
left a comment
There was a problem hiding this comment.
First batch of comment, more will come tomorrow
|
|
||
| case PeerRequestHandler.RequestFailed(peer, reason) => | ||
| context unwatch (sender()) | ||
| log.debug(s"Request failed to peer {} due to {}", peer.id, reason) |
There was a problem hiding this comment.
Minor: "Request to peer {} failed due to {}" sounds a bit better IMO
| ): Receive = handleCommonMessages orElse handleRequestResults orElse { | ||
| case Sync if currentState.numberOfPendingRequests > 0 && restartRequested.isEmpty => | ||
| val freePeers = getFreePeers(currentDownloaderState) | ||
| nodesToProcess.dequeueOption match { |
There was a problem hiding this comment.
minor: since freePeers is always used with emptiness check this could be changed into:
(nodesToProcess.dequeueOption, NonEmptyList.fromList(freePeers)) match { //rest of code
Nice bonus of that approach:
- no need to use
fromListUnsafe - exhaustiveness checks will work (using
ifincasedisabled exhaustiveness checker for given match expression)
There was a problem hiding this comment.
great suggestion!
| requests.foreach(req => requestNodes(req)) | ||
| processNodes(newState, currentStats, newDownloaderState, nodes).pipeTo(self) | ||
| context.become( | ||
| syncing( |
There was a problem hiding this comment.
that syncing handler tracks quite a bit of state now, maybe it makes sense to extract it to some class?
There was a problem hiding this comment.
yep it will probably make code clearer
| currentStats: ProcessingStatistics, | ||
| currentDownloaderState: DownloaderState, | ||
| requestResult: RequestResult | ||
| ): Future[ProcessingResult] = { |
There was a problem hiding this comment.
Does it make sense then to write this function without Future and wrap into Future at call site?
|
|
||
| case class UsefulData(responses: List[SyncResponse]) extends ResponseProcessingResult | ||
|
|
||
| final case class DownloaderState( |
There was a problem hiding this comment.
Should it go to a separate file now? This one got quite big already.
| ) { | ||
| override def mptStateSavedKeys(): Observable[Either[IterationError, ByteString]] = { | ||
| Observable.repeatEvalF(Task(Right(ByteString(1)))).takeWhile(_ => !loadingFinished) | ||
| Observable.repeat(Right(ByteString(1))).takeWhile(_ => !loadingFinished) |
There was a problem hiding this comment.
Why not Observable.interval(1.ms).map(_ => Right(ByteString(1)).takeWhile(_ => !loadingFinished)?
I'm not very familiar with monix's internals, but I can imagine that just repeat gives not much time for other stuff on processing thread.
| def idle(processingStatistics: ProcessingStatistics): Receive = { | ||
| def idle(processingStatistics: ProcessingStatistics): Receive = handleCommonMessages orElse { | ||
| case StartSyncingTo(root, bn) => | ||
| val state1 = startSyncing(root, bn) |
There was a problem hiding this comment.
startSyncing method is always followed by SyncSchedulerActorState.initial. Maybe that call could be part of startSyncing method?
| // TODO we should probably start sync again from new target block, as current trie is malformed or declare | ||
| // fast sync as failure and start normal sync from scratch | ||
| context.stop(self) | ||
| case DownloaderError(newDownloaderState, peer, description, critical) => |
| onlyPivot: Boolean = false, | ||
| failedNodeRequest: Boolean = false | ||
| ): Unit = { | ||
| val sender = TestProbe() |
There was a problem hiding this comment.
Why not making it a method on autopilot? I can see convenience argument, but also in this way it would be easy to send a message to probe, which doesn't have autpilot installed
e059252 to
535b149
Compare
Move DownloaderState to separate file Extract Actor state to separate class Call Future.apply at call site Improve synccontrollerspec autopilot
535b149 to
0f644d5
Compare
kapke
left a comment
There was a problem hiding this comment.
LGTM! If needed - I can try to sync to mainnet over the weekend to test.
| restartRequester ! WaitingForNewTargetBlock | ||
| context.become(idle(currentStats.addSaved(currentState.memBatch.size))) | ||
| } | ||
| import akka.pattern.pipe |
There was a problem hiding this comment.
very minor - I'd prefer to have this import either locally within method or on top of the file
|
@kapke any additional syncing testing is appreciated (for now, I have tested it Locally and and on EC2 machine) |
Description
Changes how nodes are processed and removes StateDownloader.
Speed of processing of each batch of nodes is highly volatile i.e at the beginning of sync it is quite fast and at the end it is quite slow, to keep the balance between number of responses queued to process in the presence of volatile number of peers, it is necessary to mark peer as active through whole mpt node request life cycle i.e
Only after this whole cycle peer is marked as free to handle another request. That way we can achieve optimal throupout through whole state sync. It also guarantees that our depth first descent won't goes too much in breadth.
Such design makes separate
StateDownloadernot necessary and even troublesome as it requires to sync up state between two actors.Testing
I had already synced it to Mainnet few times.