Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/it/scala/io/iohk/ethereum/sync/FastSyncItSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfterAll {
testScheduler.awaitTermination(60.second)
}

"FastSync" should "sync blockchain without state nodes" in customTestCaseResourceM(
it should "sync blockchain without state nodes" in customTestCaseResourceM(
FakePeer.start3FakePeersRes()
) { case (peer1, peer2, peer3) =>
for {
Expand Down Expand Up @@ -228,8 +228,8 @@ class FastSyncItSpec extends FlatSpecBase with Matchers with BeforeAndAfterAll {

_ <- peer4.importBlocksUntil(1100)(IdentityUpdate)

_ <- peer1.connectToPeers(Set(peer2.node, peer3.node, peer4.node))
_ <- peer1.startFastSync().delayExecution(50.milliseconds)
_ <- peer1.connectToPeers(Set(peer2.node, peer3.node, peer4.node)).delayExecution(5.seconds)
_ <- peer1.startFastSync().delayExecution(50.millis)
_ <- peer2.importBlocksUntil(1200)(IdentityUpdate)
_ <- peer1.waitForFastSyncFinish()
} yield {
Expand Down
10 changes: 9 additions & 1 deletion src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,15 @@ import io.iohk.ethereum.network.handshaker.{EtcHandshaker, EtcHandshakerConfigur
import io.iohk.ethereum.network.p2p.EthereumMessageDecoder
import io.iohk.ethereum.network.rlpx.AuthHandshaker
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
import io.iohk.ethereum.network.{EtcPeerManagerActor, ForkResolver, KnownNodesManager, PeerEventBusActor, PeerManagerActor, PeerStatisticsActor, ServerActor}
import io.iohk.ethereum.network.{
EtcPeerManagerActor,
ForkResolver,
KnownNodesManager,
PeerEventBusActor,
PeerManagerActor,
PeerStatisticsActor,
ServerActor
}
import io.iohk.ethereum.nodebuilder.PruningConfigBuilder
import io.iohk.ethereum.sync.util.SyncCommonItSpec._
import io.iohk.ethereum.sync.util.SyncCommonItSpecUtils._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ object RegularSyncItSpecUtils {

lazy val checkpointBlockGenerator: CheckpointBlockGenerator = new CheckpointBlockGenerator
lazy val peersClient: ActorRef =
system.actorOf(PeersClient.props(etcPeerManager, peerEventBus, blacklist, testSyncConfig, system.scheduler), "peers-client")
system.actorOf(
PeersClient.props(etcPeerManager, peerEventBus, blacklist, testSyncConfig, system.scheduler),
"peers-client"
)

lazy val ledger: Ledger =
new LedgerImpl(bl, blockchainConfig, syncConfig, buildEthashConsensus(), Scheduler.global)
Expand All @@ -83,7 +86,14 @@ object RegularSyncItSpecUtils {

val broadcasterRef: ActorRef = system.actorOf(
BlockBroadcasterActor
.props(new BlockBroadcast(etcPeerManager), peerEventBus, etcPeerManager, blacklist, syncConfig, system.scheduler),
.props(
new BlockBroadcast(etcPeerManager),
peerEventBus,
etcPeerManager,
blacklist,
syncConfig,
system.scheduler
),
"block-broadcaster"
)

Expand Down
2 changes: 1 addition & 1 deletion src/it/scala/io/iohk/ethereum/txExecTest/ForksTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ForksTest extends AnyFlatSpec with Matchers {
daoForkConfig = None,
gasTieBreaker = false,
ethCompatibleStorage = true,
treasuryAddress = Address(0),
treasuryAddress = Address(0)
)

val noErrors = a[Right[_, Seq[Receipt]]]
Expand Down
108 changes: 79 additions & 29 deletions src/main/scala/io/iohk/ethereum/blockchain/sync/fast/FastSync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import scala.util.Random
import scala.collection.mutable
import scala.util.Try
import scala.util.Success
import java.util.concurrent.atomic.AtomicInteger

// scalastyle:off file.size.limit
class FastSync(
Expand Down Expand Up @@ -115,8 +116,11 @@ class FastSync(
}
}

private val actorCounter = new AtomicInteger
private def countActor: Int = actorCounter.incrementAndGet

// scalastyle:off number.of.methods
private class SyncingHandler(initialSyncState: SyncState) {
private class SyncingHandler(initialSyncState: SyncState, var masterPeer: Option[Peer] = None) {

//not part of syncstate as we do not want to persist is.
private var stateSyncRestartRequested = false
Expand All @@ -128,7 +132,6 @@ class FastSync(
private var assignedHandlers: Map[ActorRef, Peer] = Map.empty
private var peerRequestsTime: Map[Peer, Instant] = Map.empty

private var masterPeer: Option[Peer] = None
// TODO ETCM-701 get rid of state and move skeleton download to a separate actor
private val blockHeadersQueue: mutable.Queue[HeaderRange] = mutable.Queue.empty
private var currentSkeletonState: Option[HeaderSkeleton] = None
Expand All @@ -138,13 +141,13 @@ class FastSync(
private var requestedBlockBodies: Map[ActorRef, Seq[ByteString]] = Map.empty
private var requestedReceipts: Map[ActorRef, Seq[ByteString]] = Map.empty

private val syncStateStorageActor = context.actorOf(Props[StateStorageActor](), "state-storage")
private val syncStateStorageActor = context.actorOf(Props[StateStorageActor](), s"$countActor-state-storage")
syncStateStorageActor ! fastSyncStateStorage

private val branchResolver = context.actorOf(
FastSyncBranchResolverActor
.props(self, peerEventBus, etcPeerManager, blockchain, blacklist, syncConfig, appStateStorage, scheduler),
"fast-sync-branch-resolver"
s"$countActor-fast-sync-branch-resolver"
)

private val syncStateScheduler = context.actorOf(
Expand All @@ -157,7 +160,7 @@ class FastSync(
blacklist,
scheduler
),
"state-scheduler"
s"$countActor-state-scheduler"
)

//Delay before starting to persist snapshot. It should be 0, as the presence of it marks that fast sync was started
Expand All @@ -178,7 +181,7 @@ class FastSync(
syncState = syncState.copy(downloadedNodesCount = saved, totalNodesCount = saved + missing)
}

def receive: Receive = handlePeerListMessages orElse handleStatus orElse {
def receive: Receive = handlePeerListMessages orElse handleStatus orElse handleRequestFailure orElse {
case UpdatePivotBlock(reason) => updatePivotBlock(reason)
case WaitingForNewTargetBlock =>
log.info("State sync stopped until receiving new pivot block")
Expand All @@ -190,10 +193,15 @@ class FastSync(
case StateSyncFinished =>
syncState = syncState.copy(stateSyncFinished = true)
processSyncing()
}

def handleRequestFailure: Receive = {
case PeerRequestHandler.RequestFailed(peer, reason) =>
handleRequestFailure(peer, sender(), FastSyncRequestFailed(reason))
case Terminated(ref) if assignedHandlers.contains(ref) =>
handleRequestFailure(assignedHandlers(ref), ref, PeerActorTerminated)
case Terminated(ref) =>
assignedHandlers.get(ref).foreach {
handleRequestFailure(_, ref, PeerActorTerminated)
}
}

// TODO ETCM-701 will be moved to separate actor and refactored
Expand Down Expand Up @@ -340,12 +348,13 @@ class FastSync(
batchFailuresCount += 1
if (batchFailuresCount > fastSyncMaxBatchRetries) {
log.info("Max number of allowed failures reached. Switching branch and master peer.")
handleRewind(header, masterPeer.get, fastSyncBlockValidationN, blacklistDuration)

blockHeadersQueue.dequeueAll(_ => true)

handleRewind(header, masterPeer.get, fastSyncBlockValidationN, blacklistDuration, continueSyncing = false)

// Start branch resolution and wait for response from the FastSyncBranchResolver actor.
context become waitingForBranchResolution
currentSkeletonState = None
Comment thread
AnastasiiaL marked this conversation as resolved.
blockHeadersQueue.dequeueAll(_ => true)
branchResolver ! FastSyncBranchResolverActor.StartBranchResolver
}
}
Expand All @@ -362,19 +371,33 @@ class FastSync(
}
}

private def waitingForBranchResolution: Receive = handleStatus orElse {
private def waitingForBranchResolution: Receive = handleStatus orElse handleRequestFailure orElse {
case FastSyncBranchResolverActor.BranchResolvedSuccessful(firstCommonBlockNumber, newMasterPeer) =>
log.debug(
s"Resolved branch with first common block number $firstCommonBlockNumber for new master peer $newMasterPeer"
)
// Reset the batch failures count
batchFailuresCount = 0

context.children.foreach { child =>
log.debug(s"Unwatching and killing $child")
context.unwatch(child)
child ! PoisonPill
}

// Restart syncing from the valid block available in state.
syncState = syncState.copy(
bestBlockHeaderNumber = firstCommonBlockNumber,
nextBlockToFullyValidate = firstCommonBlockNumber + 1
log.debug("Starting with fresh SyncingHandler")
val syncingHandler = new SyncingHandler(
syncState.copy(
bestBlockHeaderNumber = firstCommonBlockNumber,
nextBlockToFullyValidate = firstCommonBlockNumber + 1,
pivotBlockUpdateFailures = 0
),
masterPeer = Some(newMasterPeer)
)
masterPeer = Some(newMasterPeer)
Comment thread
AnastasiiaL marked this conversation as resolved.
context become receive
processSyncing()
context.become(syncingHandler.receive)
syncingHandler.processSyncing()

case _: FastSyncBranchResolverActor.BranchResolutionFailed =>
// there isn't much we can do if we don't find a branch/peer to continue syncing, so let's try again
branchResolver ! FastSyncBranchResolverActor.StartBranchResolver
Expand All @@ -390,7 +413,8 @@ class FastSync(
log.info("Asking for new pivot block")
val pivotBlockSelector = {
context.actorOf(
PivotBlockSelector.props(etcPeerManager, peerEventBus, syncConfig, scheduler, context.self, blacklist)
PivotBlockSelector.props(etcPeerManager, peerEventBus, syncConfig, scheduler, context.self, blacklist),
s"$countActor-pivot-block-selector-update"
)
}
pivotBlockSelector ! PivotBlockSelector.SelectPivotBlock
Expand All @@ -408,7 +432,7 @@ class FastSync(
}

def waitingForPivotBlockUpdate(updateReason: PivotBlockUpdateReason): Receive =
handlePeerListMessages orElse handleStatus orElse {
handlePeerListMessages orElse handleStatus orElse handleRequestFailure orElse {
case PivotBlockSelector.Result(pivotBlockHeader)
if newPivotIsGoodEnough(pivotBlockHeader, syncState, updateReason) =>
log.info("New pivot block with number {} received", pivotBlockHeader.number)
Expand Down Expand Up @@ -507,7 +531,9 @@ class FastSync(
}

private def removeRequestHandler(handler: ActorRef): Unit = {
log.debug(s"Removing request handler ${handler.path}")
context unwatch handler
skeletonHandler = skeletonHandler.filter(_ != handler)
assignedHandlers -= handler
}

Expand Down Expand Up @@ -560,17 +586,23 @@ class FastSync(
syncState = syncState.updateNextBlockToValidate(header, K, X)
}

private def handleRewind(header: BlockHeader, peer: Peer, N: Int, duration: FiniteDuration): Unit = {
private def handleRewind(
header: BlockHeader,
peer: Peer,
N: Int,
duration: FiniteDuration,
continueSyncing: Boolean = true
): Unit = {
blacklist.add(peer.id, duration, BlockHeaderValidationFailed)
if (header.number <= syncState.safeDownloadTarget) {
discardLastBlocks(header.number, N)
syncState = syncState.updateDiscardedBlocks(header, N)
if (header.number >= syncState.pivotBlock.number) {
updatePivotBlock(LastBlockValidationFailed)
} else {
} else if (continueSyncing) {
processSyncing()
}
} else {
} else if (continueSyncing) {
processSyncing()
}
}
Expand Down Expand Up @@ -685,6 +717,9 @@ class FastSync(
}

private def handleRequestFailure(peer: Peer, handler: ActorRef, reason: BlacklistReason): Unit = {
if (skeletonHandler == Some(handler))
currentSkeletonState = None

removeRequestHandler(handler)

requestedHeaders.get(peer).foreach(blockHeadersQueue.enqueue)
Expand Down Expand Up @@ -816,6 +851,16 @@ class FastSync(

def processSyncing(): Unit = {
FastSyncMetrics.measure(syncState)
log.debug(
"Start of processSyncing: {}",
Map(
"fullySynced" -> fullySynced,
"blockchainDataToDownload" -> blockchainDataToDownload,
"noBlockchainWorkRemaining" -> noBlockchainWorkRemaining,
"stateSyncFinished" -> syncState.stateSyncFinished,
"notInTheMiddleOfUpdate" -> notInTheMiddleOfUpdate
)
)
if (fullySynced) {
finish()
} else {
Expand Down Expand Up @@ -887,8 +932,9 @@ class FastSync(
requestSkeletonHeaders(peer)
} else {
log.debug(
"Nothing to request. Waiting for responses for [{}] sent requests.",
assignedHandlers.size + skeletonHandler.size
"Nothing to request. Waiting for responses from: {} and/or {}",
assignedHandlers.keys,
skeletonHandler
)
}
}
Expand All @@ -911,7 +957,8 @@ class FastSync(
peerEventBus,
requestMsg = GetReceipts(receiptsToGet),
responseMsgCode = Codes.ReceiptsCode
)
),
s"$countActor-peer-request-handler-receipts"
)

context watch handler
Expand All @@ -934,7 +981,8 @@ class FastSync(
peerEventBus,
requestMsg = GetBlockBodies(blockBodiesToGet),
responseMsgCode = Codes.BlockBodiesCode
)
),
s"$countActor-peer-request-handler-block-bodies"
)

context watch handler
Expand Down Expand Up @@ -962,7 +1010,8 @@ class FastSync(
peerEventBus,
requestMsg = GetBlockHeaders(Left(toRequest.from), toRequest.limit, skip = 0, reverse = false),
responseMsgCode = Codes.BlockHeadersCode
)
),
s"$countActor-peer-request-handler-block-headers"
)

context watch handler
Expand Down Expand Up @@ -1026,7 +1075,8 @@ class FastSync(
peerEventBus,
requestMsg = msg,
responseMsgCode = Codes.BlockHeadersCode
)
),
s"$countActor-peer-request-handler-block-headers-skeleton"
)

context watch handler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ class FastSyncBranchResolverActor(

private def waitingForPeerWithHighestBlock: Receive = handlePeerListMessages orElse { case StartBranchResolver =>
getPeerWithHighestBlock match {
case Some(PeerWithInfo(peer, _)) => requestRecentBlockHeaders(peer, blockchain.getBestBlockNumber())
case Some(peerWithInfo @ PeerWithInfo(peer, _)) =>
log.debug(
"Starting branch resolution now with peer {} and block number {}",
peerWithInfo,
blockchain.getBestBlockNumber()
)
requestRecentBlockHeaders(peer, blockchain.getBestBlockNumber())
case None =>
log.info("Waiting for peers, rescheduling StartBranchResolver")
timers.startSingleTimer(RestartTimerKey, StartBranchResolver, 1.second)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class PivotBlockSelector(
} else {
log.info(
"Cannot pick pivot block. Need at least {} peers, but there are only {} which meet the criteria " +
"({} all available at the moment).",
"({} all available at the moment). Best block number = {}",
minPeersToChoosePivotBlock,
correctPeers.size,
peersToDownloadFrom.size,
Expand Down
Loading