Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,10 @@ addCommandAlias(
addCommandAlias(
"pp",
""";compile-all
|;test
|;scalafmtAll
|;scalastyle
|;test:scalastyle
|;test
|;it:test
|""".stripMargin
)
17 changes: 14 additions & 3 deletions src/it/scala/io/iohk/ethereum/sync/util/FastSyncItSpecUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.iohk.ethereum.sync.util
import akka.util.ByteString
import cats.effect.Resource
import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed
import io.iohk.ethereum.blockchain.sync.FastSync
import io.iohk.ethereum.blockchain.sync.{FastSync, SyncProtocol}
import io.iohk.ethereum.blockchain.sync.FastSync.SyncState
import io.iohk.ethereum.crypto.kec256
import io.iohk.ethereum.domain.Address
Expand Down Expand Up @@ -37,7 +37,7 @@ object FastSyncItSpecUtils {
)

def startFastSync(): Task[Unit] = Task {
fastSync ! FastSync.Start
fastSync ! SyncProtocol.Start
}

def waitForFastSyncFinish(): Task[Boolean] = {
Expand Down Expand Up @@ -96,7 +96,18 @@ object FastSyncItSpecUtils {
val currentBest = bl.getBestBlock().header
val safeTarget = currentBest.number + syncConfig.fastSyncBlockValidationX
val nextToValidate = currentBest.number + 1
val syncState = SyncState(currentBest, safeTarget, Seq(), Seq(), 0, 0, currentBest.number, nextToValidate)
val syncState =
SyncState(
pivotBlock = currentBest,
lastFullBlockNumber = currentBest.number,
safeDownloadTarget = safeTarget,
blockBodiesQueue = Seq(),
receiptsQueue = Seq(),
downloadedNodesCount = 0,
totalNodesCount = 0,
bestBlockHeaderNumber = currentBest.number,
nextBlockToFullyValidate = nextToValidate
)
storagesInstance.storages.fastSyncStateStorage.putSyncState(syncState)
}.map(_ => ())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.ActorRef
import akka.util.ByteString
import cats.effect.Resource
import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed
import io.iohk.ethereum.blockchain.sync.PeersClient
import io.iohk.ethereum.blockchain.sync.{PeersClient, SyncProtocol}
import io.iohk.ethereum.blockchain.sync.regular.BlockBroadcasterActor.BroadcastBlock
import io.iohk.ethereum.blockchain.sync.regular.RegularSync
import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
Expand Down Expand Up @@ -80,7 +80,7 @@ object RegularSyncItSpecUtils {
)

def startRegularSync(): Task[Unit] = Task {
regularSync ! RegularSync.Start
regularSync ! SyncProtocol.Start
}

def broadcastBlock(
Expand Down Expand Up @@ -116,7 +116,7 @@ object RegularSyncItSpecUtils {
val currentWolrd = getMptForBlock(block)
val (newBlock, newTd, newWorld) =
createChildBlock(block, currentTd, currentWolrd, plusDifficulty)(updateWorldForBlock)
regularSync ! RegularSync.MinedBlock(newBlock)
regularSync ! SyncProtocol.MinedBlock(newBlock)
}

def mineNewBlocks(delay: FiniteDuration, nBlocks: Int)(
Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ mantis {
}

async {
ask-timeout = 100.millis

dispatchers {
block-forger {
type = Dispatcher
Expand Down
139 changes: 79 additions & 60 deletions src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import cats.data.NonEmptyList
import io.iohk.ethereum.blockchain.sync.FastSyncReceiptsValidator.ReceiptsValidationResult
import io.iohk.ethereum.blockchain.sync.PeerRequestHandler.ResponseReceived
import io.iohk.ethereum.blockchain.sync.SyncBlocksValidator.BlockBodyValidationResult
import io.iohk.ethereum.blockchain.sync.SyncProtocol.Status.Progress
import io.iohk.ethereum.blockchain.sync.SyncStateSchedulerActor.{
RestartRequested,
StartSyncingTo,
Expand All @@ -26,7 +27,7 @@ import io.iohk.ethereum.utils.Config.SyncConfig
import org.bouncycastle.util.encoders.Hex
import scala.annotation.tailrec
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{FiniteDuration, _}
import scala.concurrent.duration._
import scala.util.Random

// scalastyle:off file.size.limit
Expand Down Expand Up @@ -55,8 +56,10 @@ class FastSync(

def handleCommonMessages: Receive = handlePeerListMessages orElse handleBlacklistMessages

def idle: Receive = handleCommonMessages orElse { case Start =>
start()
def idle: Receive = handleCommonMessages orElse {
case SyncProtocol.Start =>
start()
case SyncProtocol.GetStatus => sender() ! SyncProtocol.Status.NotSyncing
}

def start(): Unit = {
Expand All @@ -82,22 +85,24 @@ class FastSync(
context become waitingForPivotBlock
}

def waitingForPivotBlock: Receive = handleCommonMessages orElse { case PivotBlockSelector.Result(pivotBlockHeader) =>
if (pivotBlockHeader.number < 1) {
log.info("Unable to start block synchronization in fast mode: pivot block is less than 1")
appStateStorage.fastSyncDone().commit()
context become idle
syncController ! Done
} else {
val initialSyncState =
SyncState(
pivotBlockHeader,
safeDownloadTarget = pivotBlockHeader.number + syncConfig.fastSyncBlockValidationX
)
val syncingHandler = new SyncingHandler(initialSyncState)
context.become(syncingHandler.receive)
syncingHandler.processSyncing()
}
def waitingForPivotBlock: Receive = handleCommonMessages orElse {
case SyncProtocol.GetStatus => sender() ! SyncProtocol.Status.NotSyncing
case PivotBlockSelector.Result(pivotBlockHeader) =>
if (pivotBlockHeader.number < 1) {
log.info("Unable to start block synchronization in fast mode: pivot block is less than 1")
appStateStorage.fastSyncDone().commit()
context become idle
syncController ! Done
} else {
val initialSyncState =
SyncState(
pivotBlockHeader,
safeDownloadTarget = pivotBlockHeader.number + syncConfig.fastSyncBlockValidationX
)
val syncingHandler = new SyncingHandler(initialSyncState)
context.become(syncingHandler.receive)
syncingHandler.processSyncing()
}
}

// scalastyle:off number.of.methods
Expand Down Expand Up @@ -140,8 +145,14 @@ class FastSync(
private val heartBeat =
scheduler.scheduleWithFixedDelay(syncRetryInterval, syncRetryInterval * 2, self, ProcessSyncing)

def receive: Receive = handleCommonMessages orElse {
case UpdatePivotBlock(state) => updatePivotBlock(state)
def handleStatus: Receive = {
case SyncProtocol.GetStatus => sender() ! currentSyncingStatus
case SyncStateSchedulerActor.StateSyncStats(saved, missing) =>
syncState = syncState.copy(downloadedNodesCount = saved, totalNodesCount = (saved + missing))
}

def receive: Receive = handleCommonMessages orElse handleStatus orElse {
case UpdatePivotBlock(reason) => updatePivotBlock(reason)
case WaitingForNewTargetBlock =>
log.info("State sync stopped until receiving new pivot block")
updatePivotBlock(ImportedLastBlock)
Expand Down Expand Up @@ -200,7 +211,8 @@ class FastSync(

def reScheduleAskForNewPivot(updateReason: PivotBlockUpdateReason): Unit = {
syncState = syncState.copy(pivotBlockUpdateFailures = syncState.pivotBlockUpdateFailures + 1)
scheduler.scheduleOnce(syncConfig.pivotBlockReScheduleInterval, self, UpdatePivotBlock(updateReason))
scheduler
.scheduleOnce(syncConfig.pivotBlockReScheduleInterval, self, UpdatePivotBlock(updateReason))
}

private def stalePivotAfterRestart(
Expand All @@ -219,23 +231,33 @@ class FastSync(
newPivot.number >= currentState.pivotBlock.number && !stalePivotAfterRestart(newPivot, currentState, updateReason)
}

def waitingForPivotBlockUpdate(updateReason: PivotBlockUpdateReason): Receive = handleCommonMessages orElse {
case PivotBlockSelector.Result(pivotBlockHeader)
if newPivotIsGoodEnough(pivotBlockHeader, syncState, updateReason) =>
log.info(s"New pivot block with number ${pivotBlockHeader.number} received")
updatePivotSyncState(updateReason, pivotBlockHeader)
context become this.receive
processSyncing()
def waitingForPivotBlockUpdate(updateReason: PivotBlockUpdateReason): Receive =
handleCommonMessages orElse handleStatus orElse {
case PivotBlockSelector.Result(pivotBlockHeader)
if newPivotIsGoodEnough(pivotBlockHeader, syncState, updateReason) =>
log.info(s"New pivot block with number ${pivotBlockHeader.number} received")
updatePivotSyncState(updateReason, pivotBlockHeader)
context become this.receive
processSyncing()

case PivotBlockSelector.Result(pivotBlockHeader)
if !newPivotIsGoodEnough(pivotBlockHeader, syncState, updateReason) =>
log.info("Received pivot block is older than old one, re-scheduling asking for new one")
reScheduleAskForNewPivot(updateReason)
case PivotBlockSelector.Result(pivotBlockHeader)
if !newPivotIsGoodEnough(pivotBlockHeader, syncState, updateReason) =>
log.info("Received pivot block is older than old one, re-scheduling asking for new one")
reScheduleAskForNewPivot(updateReason)

case PersistSyncState => persistSyncState()
case PersistSyncState => persistSyncState()

case UpdatePivotBlock(state) => updatePivotBlock(state)
}
case UpdatePivotBlock(state) => updatePivotBlock(state)
}

def currentSyncingStatus: SyncProtocol.Status =
SyncProtocol.Status.Syncing(
initialSyncState.lastFullBlockNumber,
Progress(syncState.lastFullBlockNumber, syncState.pivotBlock.number),
Some(
Progress(syncState.downloadedNodesCount, syncState.totalNodesCount.max(1))
) //There's always at least one state root to fetch
)

private def updatePivotBlock(updateReason: PivotBlockUpdateReason): Unit = {
if (syncState.pivotBlockUpdateFailures <= syncConfig.maximumTargetUpdateFailures) {
Expand Down Expand Up @@ -745,24 +767,26 @@ class FastSync(
def fullySynced: Boolean = {
syncState.isBlockchainWorkFinished && assignedHandlers.isEmpty && syncState.stateSyncFinished
}
}

private def updateBestBlockIfNeeded(receivedHashes: Seq[ByteString]): Unit = {
val fullBlocks = receivedHashes.flatMap { hash =>
for {
header <- blockchain.getBlockHeaderByHash(hash)
_ <- blockchain.getBlockBodyByHash(hash)
_ <- blockchain.getReceiptsByHash(hash)
} yield header
}
private def updateBestBlockIfNeeded(receivedHashes: Seq[ByteString]): Unit = {
val fullBlocks = receivedHashes.flatMap { hash =>
for {
header <- blockchain.getBlockHeaderByHash(hash)
_ <- blockchain.getBlockBodyByHash(hash)
_ <- blockchain.getReceiptsByHash(hash)
} yield header
}

if (fullBlocks.nonEmpty) {
val bestReceivedBlock = fullBlocks.maxBy(_.number)
if (appStateStorage.getBestBlockNumber() < bestReceivedBlock.number) {
appStateStorage.putBestBlockNumber(bestReceivedBlock.number).commit()
if (fullBlocks.nonEmpty) {
val bestReceivedBlock = fullBlocks.maxBy(_.number)
val lastStoredBestBlockNumber = appStateStorage.getBestBlockNumber()
if (lastStoredBestBlockNumber < bestReceivedBlock.number) {
appStateStorage.putBestBlockNumber(bestReceivedBlock.number).commit()
}
syncState = syncState.copy(lastFullBlockNumber = bestReceivedBlock.number.max(lastStoredBestBlockNumber))
}
}

}
}
}

Expand Down Expand Up @@ -794,7 +818,7 @@ object FastSync {
)
)

private case class UpdatePivotBlock(state: PivotBlockUpdateReason)
private case class UpdatePivotBlock(reason: PivotBlockUpdateReason)
private case object ProcessSyncing

private[sync] case object PersistSyncState
Expand All @@ -803,11 +827,12 @@ object FastSync {

case class SyncState(
pivotBlock: BlockHeader,
lastFullBlockNumber: BigInt = 0,
safeDownloadTarget: BigInt = 0,
blockBodiesQueue: Seq[ByteString] = Nil,
receiptsQueue: Seq[ByteString] = Nil,
downloadedNodesCount: Int = 0,
totalNodesCount: Int = 0,
downloadedNodesCount: Long = 0,
totalNodesCount: Long = 0,
bestBlockHeaderNumber: BigInt = 0,
nextBlockToFullyValidate: BigInt = 1,
pivotBlockUpdateFailures: Int = 0,
Expand Down Expand Up @@ -846,9 +871,8 @@ object FastSync {
updatingPivotBlock = false
)

def isBlockchainWorkFinished: Boolean = {
def isBlockchainWorkFinished: Boolean =
bestBlockHeaderNumber >= safeDownloadTarget && !blockChainWorkQueued
}
}

sealed trait HashType {
Expand All @@ -860,17 +884,12 @@ object FastSync {
case class EvmCodeHash(v: ByteString) extends HashType
case class StorageRootHash(v: ByteString) extends HashType

case object Start
case object Done

sealed abstract class HeaderProcessingResult

case object HeadersProcessingFinished extends HeaderProcessingResult

case class ParentDifficultyNotFound(header: BlockHeader) extends HeaderProcessingResult

case class ValidationFailed(header: BlockHeader, peer: Peer) extends HeaderProcessingResult

case object ImportedPivotBlock extends HeaderProcessingResult

sealed abstract class PivotBlockUpdateReason {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ class SyncController(
) extends Actor
with ActorLogging {

import SyncController._

def scheduler: Scheduler = externalSchedulerOpt getOrElse context.system.scheduler

override def receive: Receive = idle

def idle: Receive = { case Start =>
def idle: Receive = { case SyncProtocol.Start =>
start()
}

Expand Down Expand Up @@ -89,7 +87,7 @@ class SyncController(
),
"fast-sync"
)
fastSync ! FastSync.Start
fastSync ! SyncProtocol.Start
context become runningFastSync(fastSync)
}

Expand All @@ -112,7 +110,7 @@ class SyncController(
),
"regular-sync"
)
regularSync ! RegularSync.Start
regularSync ! SyncProtocol.Start
context become runningRegularSync(regularSync)
}

Expand Down Expand Up @@ -150,6 +148,4 @@ object SyncController {
syncConfig
)
)

case object Start
}
Loading