@@ -47,6 +47,7 @@ use lighthouse_network::{MessageId, NetworkGlobals, PeerId};
4747use logging:: crit;
4848use logging:: TimeLatch ;
4949use parking_lot:: Mutex ;
50+ pub use scheduler:: work_reprocessing_queue;
5051use serde:: { Deserialize , Serialize } ;
5152use slot_clock:: SlotClock ;
5253use std:: cmp;
@@ -73,7 +74,7 @@ use work_reprocessing_queue::{
7374use work_reprocessing_queue:: { IgnoredRpcBlock , QueuedSamplingRequest } ;
7475
7576mod metrics;
76- pub mod work_reprocessing_queue ;
77+ pub mod scheduler ;
7778
7879/// The maximum size of the channel for work events to the `BeaconProcessor`.
7980///
@@ -264,22 +265,16 @@ impl Default for BeaconProcessorConfig {
264265pub struct BeaconProcessorChannels < E : EthSpec > {
265266 pub beacon_processor_tx : BeaconProcessorSend < E > ,
266267 pub beacon_processor_rx : mpsc:: Receiver < WorkEvent < E > > ,
267- pub work_reprocessing_tx : mpsc:: Sender < ReprocessQueueMessage > ,
268- pub work_reprocessing_rx : mpsc:: Receiver < ReprocessQueueMessage > ,
269268}
270269
271270impl < E : EthSpec > BeaconProcessorChannels < E > {
272271 pub fn new ( config : & BeaconProcessorConfig ) -> Self {
273272 let ( beacon_processor_tx, beacon_processor_rx) =
274273 mpsc:: channel ( config. max_work_event_queue_len ) ;
275- let ( work_reprocessing_tx, work_reprocessing_rx) =
276- mpsc:: channel ( config. max_scheduled_work_queue_len ) ;
277274
278275 Self {
279276 beacon_processor_tx : BeaconProcessorSend ( beacon_processor_tx) ,
280277 beacon_processor_rx,
281- work_reprocessing_rx,
282- work_reprocessing_tx,
283278 }
284279 }
285280}
@@ -638,6 +633,7 @@ pub enum Work<E: EthSpec> {
638633 LightClientUpdatesByRangeRequest ( BlockingFn ) ,
639634 ApiRequestP0 ( BlockingOrAsync ) ,
640635 ApiRequestP1 ( BlockingOrAsync ) ,
636+ Reprocess ( ReprocessQueueMessage ) ,
641637}
642638
643639impl < E : EthSpec > fmt:: Debug for Work < E > {
@@ -692,6 +688,7 @@ pub enum WorkType {
692688 LightClientUpdatesByRangeRequest ,
693689 ApiRequestP0 ,
694690 ApiRequestP1 ,
691+ Reprocess ,
695692}
696693
697694impl < E : EthSpec > Work < E > {
@@ -750,6 +747,7 @@ impl<E: EthSpec> Work<E> {
750747 }
751748 Work :: ApiRequestP0 { .. } => WorkType :: ApiRequestP0 ,
752749 Work :: ApiRequestP1 { .. } => WorkType :: ApiRequestP1 ,
750+ Work :: Reprocess { .. } => WorkType :: Reprocess ,
753751 }
754752 }
755753}
@@ -774,7 +772,7 @@ struct InboundEvents<E: EthSpec> {
774772 /// Used by upstream processes to send new work to the `BeaconProcessor`.
775773 event_rx : mpsc:: Receiver < WorkEvent < E > > ,
776774 /// Used internally for queuing work ready to be re-processed.
777- reprocess_work_rx : mpsc:: Receiver < ReadyWork > ,
775+ ready_work_rx : mpsc:: Receiver < ReadyWork > ,
778776}
779777
780778impl < E : EthSpec > Stream for InboundEvents < E > {
@@ -795,7 +793,7 @@ impl<E: EthSpec> Stream for InboundEvents<E> {
795793
796794 // Poll for delayed blocks before polling for new work. It might be the case that a delayed
797795 // block is required to successfully process some new work.
798- match self . reprocess_work_rx . poll_recv ( cx) {
796+ match self . ready_work_rx . poll_recv ( cx) {
799797 Poll :: Ready ( Some ( ready_work) ) => {
800798 return Poll :: Ready ( Some ( InboundEvent :: ReprocessingWork ( ready_work. into ( ) ) ) ) ;
801799 }
@@ -846,8 +844,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
846844 pub fn spawn_manager < S : SlotClock + ' static > (
847845 mut self ,
848846 event_rx : mpsc:: Receiver < WorkEvent < E > > ,
849- work_reprocessing_tx : mpsc:: Sender < ReprocessQueueMessage > ,
850- work_reprocessing_rx : mpsc:: Receiver < ReprocessQueueMessage > ,
851847 work_journal_tx : Option < mpsc:: Sender < & ' static str > > ,
852848 slot_clock : S ,
853849 maximum_gossip_clock_disparity : Duration ,
@@ -935,9 +931,13 @@ impl<E: EthSpec> BeaconProcessor<E> {
935931 // receive them back once they are ready (`ready_work_rx`).
936932 let ( ready_work_tx, ready_work_rx) =
937933 mpsc:: channel :: < ReadyWork > ( self . config . max_scheduled_work_queue_len ) ;
934+
935+ let ( reprocess_work_tx, reprocess_work_rx) =
936+ mpsc:: channel :: < ReprocessQueueMessage > ( self . config . max_scheduled_work_queue_len ) ;
937+
938938 spawn_reprocess_scheduler (
939939 ready_work_tx,
940- work_reprocessing_rx ,
940+ reprocess_work_rx ,
941941 & self . executor ,
942942 Arc :: new ( slot_clock) ,
943943 maximum_gossip_clock_disparity,
@@ -951,7 +951,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
951951 let mut inbound_events = InboundEvents {
952952 idle_rx,
953953 event_rx,
954- reprocess_work_rx : ready_work_rx,
954+ ready_work_rx,
955955 } ;
956956
957957 let enable_backfill_rate_limiting = self . config . enable_backfill_rate_limiting ;
@@ -965,7 +965,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
965965 Some ( InboundEvent :: WorkEvent ( event) ) if enable_backfill_rate_limiting => {
966966 match QueuedBackfillBatch :: try_from ( event) {
967967 Ok ( backfill_batch) => {
968- match work_reprocessing_tx
968+ match reprocess_work_tx
969969 . try_send ( ReprocessQueueMessage :: BackfillSync ( backfill_batch) )
970970 {
971971 Err ( e) => {
@@ -1027,8 +1027,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
10271027 . unwrap_or ( WORKER_FREED ) ;
10281028
10291029 // We don't care if this message was successfully sent, we only use the journal
1030- // during testing.
1031- let _ = work_journal_tx. try_send ( id) ;
1030+ // during testing. We also ignore reprocess messages to ensure our test cases can pass.
1031+ if id != "reprocess" {
1032+ let _ = work_journal_tx. try_send ( id) ;
1033+ }
10321034 }
10331035
10341036 let can_spawn = self . current_workers < self . config . max_workers ;
@@ -1318,6 +1320,14 @@ impl<E: EthSpec> BeaconProcessor<E> {
13181320 let work_type = work. to_type ( ) ;
13191321
13201322 match work {
1323+ Work :: Reprocess ( work_event) => {
1324+ if let Err ( e) = reprocess_work_tx. try_send ( work_event) {
1325+ error ! (
1326+ error = ?e,
1327+ "Failed to reprocess work event"
1328+ )
1329+ }
1330+ }
13211331 _ if can_spawn => self . spawn_worker ( work, idle_tx) ,
13221332 Work :: GossipAttestation { .. } => attestation_queue. push ( work) ,
13231333 // Attestation batches are formed internally within the
@@ -1488,6 +1498,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
14881498 WorkType :: LightClientUpdatesByRangeRequest => lc_update_range_queue. len ( ) ,
14891499 WorkType :: ApiRequestP0 => api_request_p0_queue. len ( ) ,
14901500 WorkType :: ApiRequestP1 => api_request_p1_queue. len ( ) ,
1501+ WorkType :: Reprocess => 0 ,
14911502 } ;
14921503 metrics:: observe_vec (
14931504 & metrics:: BEACON_PROCESSOR_QUEUE_LENGTH ,
@@ -1639,6 +1650,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
16391650 | Work :: LightClientUpdatesByRangeRequest ( process_fn) => {
16401651 task_spawner. spawn_blocking ( process_fn)
16411652 }
1653+ Work :: Reprocess ( _) => { }
16421654 } ;
16431655 }
16441656}
0 commit comments