[ETCM-213] Reload bloom after restart#742
Conversation
|
After margin [ETCM-141] scalafmt . This PR can be updated by git merge |
|
Just fyi: Iterating over 10milion keys and values saved in db, on my machine (ssd drive) take around 8s. Now there is around 150M mpt nodes to save. So taking worst case into account i.e restart neat the finish and ~10s for 10M values, the worst time user will wait to re-lad bloom filter will be 150s which imo it is reasonable time to wait. Take in mind that in case of loading failure (i.e not loading all nodes from db) will not make state sync unusable but will just degrade its performance due to possible large number of false positives i.e large number of unnecessary db calls. That why error handling is done is in best effort manner. We report the loading failure and error, but we proceed to start state sync without any loading restarts and recovers. We could in theory save the last loaded key before failure, and try to restart iterator from this key but imo for it is not worth it. and bloom filter of 150m keys, with 3% false positive setting should weigh less 200mb |
a1de42a to
e9833d1
Compare
mmrozek
left a comment
There was a problem hiding this comment.
Nice job. One minor comment only
| private def moveIterator(it: RocksIterator): Observable[Either[IterationError, (Array[Byte], Array[Byte])]] = { | ||
| Observable | ||
| .fromTask(Task(it.seekToFirst())) | ||
| .flatMap(_ => Observable.fromTask(Task(it.isValid))) | ||
| .flatMap { valid => | ||
| if (!valid) { | ||
| Observable.empty | ||
| } else { | ||
| Observable.fromTask(Task(Right(it.key(), it.value()))) ++ Observable | ||
| .repeatEvalF { | ||
| Task { | ||
| it.next() | ||
| }.flatMap { _ => | ||
| if (it.isValid) { | ||
| Task(Right(it.key(), it.value())) | ||
| } else { | ||
| Task.raiseError(IterationFinished) | ||
| } | ||
| } | ||
| } | ||
| .onErrorHandleWith { | ||
| case IterationFinished => Observable.empty | ||
| case ex => Observable(Left(IterationError(ex))) | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Maybe sth like that?
private def moveIterator(it: RocksIterator): Observable[Either[IterationError, (Array[Byte], Array[Byte])]] = {
Observable
.fromTask(Task(it.seekToFirst()))
.flatMap(_ => Observable.fromTask(Task(it.isValid)))
.filter(identity)
.flatMap { _ =>
Observable.fromTask(Task(Right(it.key(), it.value()))) ++ Observable
.repeatEvalF {
Task {
it.next()
}.flatMap { _ =>
Task(it.isValid).flatMap {
case true => Task(Right(it.key(), it.value()))
case false => Task.raiseError(IterationFinished)
}
}
}
.onErrorHandleWith {
case IterationFinished => Observable.empty
case ex => Observable(Left(IterationError(ex)))
}
}
}
There was a problem hiding this comment.
Yep, second it.isValid shold probably be in Task. I will refactor whole bit in repeatEvalF to be in for-comp
|
|
||
| override def keySerializer: String => IndexedSeq[Byte] = _.getBytes | ||
|
|
||
| override def keyDeserializer: IndexedSeq[Byte] => String = b => new String(b.toArray) |
There was a problem hiding this comment.
same as in appstate storage - I'd add charset here
|
|
||
| val namespace: IndexedSeq[Byte] = Namespaces.KnownNodesNamespace | ||
| def keySerializer: String => IndexedSeq[Byte] = _.getBytes | ||
| def keyDeserializer: IndexedSeq[Byte] => String = k => new String(k.toArray) |
| "LoadableBloomFilter" should "load all correct elements " in testCaseM[Task] { | ||
| for { | ||
| source <- Task(Observable.fromIterable(Seq(Right(1L), Right(2L), Right(3L)))) | ||
| filter <- Task.now(LoadableBloomFilter[Long](1000, source)) |
There was a problem hiding this comment.
why not filter = LoadableBloomFilter(...) then? (and below too)
There was a problem hiding this comment.
to be honest constant changes between = and <- just makes code unreadable for me, but sure i will change it 👍
| override def buildBlockChain(): BlockchainImpl = { | ||
| val storages = getNewStorages | ||
| //iterating 1M key and values should force scheduler actor o enqueue last received command i.e StartSyncing | ||
| (0 until 1000000).foreach { i => |
There was a problem hiding this comment.
what about overriding storages in a way that will make source for bloom filter return observable which doesn't emit elements until some signal?
Refactor source iterator Add docs to iterate methods Add explicit charset to storages
| import java.nio.charset.Charset | ||
|
|
||
| object StorageStringCharset { | ||
| val UTF8Charset = Charset.forName("UTF-8") |
There was a problem hiding this comment.
Minor: there is java.nio.StandardCharsets.UTF8 too
| stateStorage = storages.stateStorage | ||
| ) { | ||
| override def mptStateSavedKeys(): Observable[Either[IterationError, ByteString]] = { | ||
| Observable.repeatEvalF(Task(Right(ByteString(1)))).takeWhile(_ => !loadingFinished) |
Description
Reload fast sync bloom filter after node restart
Proposed Solution
Expose rockdb iterator over the node storage column and code column and consume it by putting all known keys in bloom filter
Important Changes Introduced
LoadabeBloomFilterclass which can load data from provided source