@@ -466,6 +466,26 @@ impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator<Item = Tipset> + Un
466466 }
467467 }
468468
469+ fn process_cid < DB : Blockstore > (
470+ cid : Cid ,
471+ db : & DB ,
472+ extract_sender : & Option < flume:: Sender < Cid > > ,
473+ queue : & mut Vec < ( Cid , Option < Vec < u8 > > ) > ,
474+ seen : & Arc < Mutex < CidHashSet > > ,
475+ fail_on_dead_links : bool ,
476+ ) -> anyhow:: Result < ( ) > {
477+ if should_save_block_to_snapshot ( cid) {
478+ if db. has ( & cid) ? {
479+ send ( extract_sender, cid) ?;
480+ } else if fail_on_dead_links {
481+ queue. push ( ( cid, None ) ) ;
482+ } else {
483+ seen. lock ( ) . insert ( cid) ;
484+ }
485+ }
486+ Ok ( ( ) )
487+ }
488+
469489 let stateroot_limit = self . stateroot_limit ;
470490 let fail_on_dead_links = self . fail_on_dead_links ;
471491
@@ -505,38 +525,28 @@ impl<'a, DB: Blockstore + Send + Sync + 'static, T: Iterator<Item = Tipset> + Un
505525 }
506526
507527 // Process block messages.
508- if block. epoch > stateroot_limit
509- && should_save_block_to_snapshot ( block. messages )
510- {
511- if this. db . has ( & block. messages ) ? {
512- send ( this. extract_sender , block. messages ) ?;
513- // This will simply return an error once we reach that item in
514- // the queue.
515- } else if fail_on_dead_links {
516- this. queue . push ( ( block. messages , None ) ) ;
517- } else {
518- // Make sure we update seen here as we don't send the block for
519- // inspection.
520- this. seen . lock ( ) . insert ( block. messages ) ;
521- }
528+ if block. epoch > stateroot_limit {
529+ process_cid (
530+ block. messages ,
531+ this. db ,
532+ & this. extract_sender ,
533+ this. queue ,
534+ this. seen ,
535+ fail_on_dead_links,
536+ ) ?;
522537 }
523538
524539 // Visit the block if it's within required depth. And a special case for `0`
525540 // epoch to match Lotus' implementation.
526- if ( block. epoch == 0 || block. epoch > stateroot_limit)
527- && should_save_block_to_snapshot ( block. state_root )
528- {
529- if this. db . has ( & block. state_root ) ? {
530- send ( this. extract_sender , block. state_root ) ?;
531- // This will simply return an error once we reach that item in
532- // the queue.
533- } else if fail_on_dead_links {
534- this. queue . push ( ( block. state_root , None ) ) ;
535- } else {
536- // Make sure we update seen here as we don't send the block for
537- // inspection.
538- this. seen . lock ( ) . insert ( block. state_root ) ;
539- }
541+ if block. epoch == 0 || block. epoch > stateroot_limit {
542+ process_cid (
543+ block. state_root ,
544+ this. db ,
545+ & this. extract_sender ,
546+ this. queue ,
547+ this. seen ,
548+ fail_on_dead_links,
549+ ) ?;
540550 }
541551 }
542552 }
0 commit comments