@@ -124,6 +124,14 @@ pub enum Notification {
124124 Finalization ( FinalizationNotification ) ,
125125 Reconstruction ,
126126 PruneBlobs ( Epoch ) ,
127+ ManualFinalization ( ManualFinalizationNotification ) ,
128+ }
129+
130+ pub struct ManualFinalizationNotification {
131+ pub state_root : BeaconStateHash ,
132+ pub checkpoint : Checkpoint ,
133+ pub head_tracker : Arc < HeadTracker > ,
134+ pub genesis_block_root : Hash256 ,
127135}
128136
129137pub struct FinalizationNotification {
@@ -190,6 +198,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
190198 Ok ( ( ) )
191199 }
192200
201+ pub fn process_manual_finalization ( & self , notif : ManualFinalizationNotification ) {
202+ let _ = self . send_background_notification ( Notification :: ManualFinalization ( notif) ) ;
203+ }
204+
193205 pub fn process_reconstruction ( & self ) {
194206 if let Some ( Notification :: Reconstruction ) =
195207 self . send_background_notification ( Notification :: Reconstruction )
@@ -289,6 +301,103 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
289301 }
290302 }
291303
304+ fn run_manual_migration (
305+ db : Arc < HotColdDB < E , Hot , Cold > > ,
306+ notif : ManualFinalizationNotification ,
307+ log : & Logger ,
308+ ) {
309+ let state_root = notif. state_root ;
310+ let block_root = notif. checkpoint . root ;
311+
312+ let manually_finalized_state = match db. get_state ( & state_root. into ( ) , None ) {
313+ Ok ( Some ( state) ) => state,
314+ other => {
315+ error ! (
316+ log,
317+ "Manual migrator failed to load state" ;
318+ "state_root" => ?state_root,
319+ "error" => ?other
320+ ) ;
321+ return ;
322+ }
323+ } ;
324+
325+ let old_finalized_checkpoint = match Self :: prune_abandoned_forks (
326+ db. clone ( ) ,
327+ notif. head_tracker ,
328+ state_root,
329+ & manually_finalized_state,
330+ notif. checkpoint ,
331+ notif. genesis_block_root ,
332+ log,
333+ ) {
334+ Ok ( PruningOutcome :: Successful {
335+ old_finalized_checkpoint,
336+ } ) => old_finalized_checkpoint,
337+ Ok ( PruningOutcome :: DeferredConcurrentHeadTrackerMutation ) => {
338+ warn ! (
339+ log,
340+ "Pruning deferred because of a concurrent mutation" ;
341+ "message" => "this is expected only very rarely!"
342+ ) ;
343+ return ;
344+ }
345+ Ok ( PruningOutcome :: OutOfOrderFinalization {
346+ old_finalized_checkpoint,
347+ new_finalized_checkpoint,
348+ } ) => {
349+ warn ! (
350+ log,
351+ "Ignoring out of order finalization request" ;
352+ "old_finalized_epoch" => old_finalized_checkpoint. epoch,
353+ "new_finalized_epoch" => new_finalized_checkpoint. epoch,
354+ "message" => "this is expected occasionally due to a (harmless) race condition"
355+ ) ;
356+ return ;
357+ }
358+ Err ( e) => {
359+ warn ! ( log, "Block pruning failed" ; "error" => ?e) ;
360+ return ;
361+ }
362+ } ;
363+
364+ match migrate_database (
365+ db. clone ( ) ,
366+ state_root. into ( ) ,
367+ block_root,
368+ & manually_finalized_state,
369+ ) {
370+ Ok ( ( ) ) => { }
371+ Err ( Error :: HotColdDBError ( HotColdDBError :: FreezeSlotUnaligned ( slot) ) ) => {
372+ debug ! (
373+ log,
374+ "Database migration postponed, unaligned finalized block" ;
375+ "slot" => slot. as_u64( )
376+ ) ;
377+ }
378+ Err ( e) => {
379+ warn ! (
380+ log,
381+ "Database migration failed" ;
382+ "error" => format!( "{:?}" , e)
383+ ) ;
384+ return ;
385+ }
386+ } ;
387+
388+ // Finally, compact the database so that new free space is properly reclaimed.
389+ if let Err ( e) = Self :: run_compaction (
390+ db,
391+ old_finalized_checkpoint. epoch ,
392+ notif. checkpoint . epoch ,
393+ log,
394+ ) {
395+ warn ! ( log, "Database compaction failed" ; "error" => format!( "{:?}" , e) ) ;
396+ }
397+
398+ debug ! ( log, "Database consolidation complete" ) ;
399+ }
400+
292401 /// Perform the actual work of `process_finalization`.
293402 fn run_migration (
294403 db : Arc < HotColdDB < E , Hot , Cold > > ,
@@ -422,16 +531,27 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
422531 while let Ok ( notif) = rx. recv ( ) {
423532 let mut reconstruction_notif = None ;
424533 let mut finalization_notif = None ;
534+ let mut manual_finalization_notif = None ;
425535 let mut prune_blobs_notif = None ;
426536 match notif {
427537 Notification :: Reconstruction => reconstruction_notif = Some ( notif) ,
428538 Notification :: Finalization ( fin) => finalization_notif = Some ( fin) ,
539+ Notification :: ManualFinalization ( fin) => manual_finalization_notif = Some ( fin) ,
429540 Notification :: PruneBlobs ( dab) => prune_blobs_notif = Some ( dab) ,
430541 }
431542 // Read the rest of the messages in the channel, taking the best of each type.
432543 for notif in rx. try_iter ( ) {
433544 match notif {
434545 Notification :: Reconstruction => reconstruction_notif = Some ( notif) ,
546+ Notification :: ManualFinalization ( fin) => {
547+ if let Some ( current) = manual_finalization_notif. as_mut ( ) {
548+ if fin. checkpoint . epoch > current. checkpoint . epoch {
549+ * current = fin;
550+ }
551+ } else {
552+ manual_finalization_notif = Some ( fin) ;
553+ }
554+ }
435555 Notification :: Finalization ( fin) => {
436556 if let Some ( current) = finalization_notif. as_mut ( ) {
437557 if fin. finalized_checkpoint . epoch
@@ -454,6 +574,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
454574 if let Some ( fin) = finalization_notif {
455575 Self :: run_migration ( db. clone ( ) , fin, & log) ;
456576 }
577+ if let Some ( fin) = manual_finalization_notif {
578+ Self :: run_manual_migration ( db. clone ( ) , fin, & log) ;
579+ }
457580 if let Some ( dab) = prune_blobs_notif {
458581 Self :: run_prune_blobs ( db. clone ( ) , dab, & log) ;
459582 }
0 commit comments