@@ -12,7 +12,7 @@ use lighthouse_network::{PeerAction, PeerId};
1212use logging:: crit;
1313use std:: collections:: { BTreeMap , HashSet , btree_map:: Entry } ;
1414use strum:: IntoStaticStr ;
15- use tracing:: { debug, warn} ;
15+ use tracing:: { Span , debug, instrument , warn} ;
1616use types:: { ColumnIndex , Epoch , EthSpec , Hash256 , Slot } ;
1717
1818/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
@@ -111,6 +111,9 @@ pub struct SyncingChain<T: BeaconChainTypes> {
111111
112112 /// The current processing batch, if any.
113113 current_processing_batch : Option < BatchId > ,
114+
115+ /// The span to track the lifecycle of the syncing chain.
116+ span : Span ,
114117}
115118
116119#[ derive( PartialEq , Debug ) ]
@@ -123,6 +126,7 @@ pub enum ChainSyncingState {
123126
124127impl < T : BeaconChainTypes > SyncingChain < T > {
125128 #[ allow( clippy:: too_many_arguments) ]
129+ #[ instrument( name = "syncing_chain" , parent = None , level="debug" ) ]
126130 pub fn new (
127131 id : Id ,
128132 start_epoch : Epoch ,
@@ -131,6 +135,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
131135 peer_id : PeerId ,
132136 chain_type : SyncingChainType ,
133137 ) -> Self {
138+ let span = Span :: current ( ) ;
134139 SyncingChain {
135140 id,
136141 chain_type,
@@ -145,6 +150,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
145150 attempted_optimistic_starts : HashSet :: default ( ) ,
146151 state : ChainSyncingState :: Stopped ,
147152 current_processing_batch : None ,
153+ span,
148154 }
149155 }
150156
@@ -185,6 +191,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
185191
186192 /// Removes a peer from the chain.
187193 /// If the peer has active batches, those are considered failed and re-requested.
194+ #[ instrument( parent = & self . span, level="debug" , skip_all, fields( ?peer_id) ) ]
188195 pub fn remove_peer ( & mut self , peer_id : & PeerId ) -> ProcessingResult {
189196 self . peers . remove ( peer_id) ;
190197
@@ -205,6 +212,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
205212
206213 /// A block has been received for a batch on this chain.
207214 /// If the block correctly completes the batch it will be processed if possible.
215+ #[ instrument( parent = & self . span, level="debug" , skip_all) ]
208216 pub fn on_block_response (
209217 & mut self ,
210218 network : & mut SyncNetworkContext < T > ,
@@ -251,6 +259,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
251259
252260 /// Processes the batch with the given id.
253261 /// The batch must exist and be ready for processing
262+ #[ instrument( parent = & self . span, level="debug" , skip_all, fields( ?batch_id) ) ]
254263 fn process_batch (
255264 & mut self ,
256265 network : & mut SyncNetworkContext < T > ,
@@ -298,6 +307,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
298307 }
299308
300309 /// Processes the next ready batch, prioritizing optimistic batches over the processing target.
310+ #[ instrument( parent = & self . span, level="debug" , skip_all) ]
301311 fn process_completed_batches (
302312 & mut self ,
303313 network : & mut SyncNetworkContext < T > ,
@@ -405,6 +415,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
405415
406416 /// The block processor has completed processing a batch. This function handles the result
407417 /// of the batch processor.
418+ #[ instrument( parent = & self . span, level="debug" , skip_all, fields( ?batch_id, ?result) ) ]
408419 pub fn on_batch_process_result (
409420 & mut self ,
410421 network : & mut SyncNetworkContext < T > ,
@@ -756,6 +767,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
756767 /// This chain has been requested to start syncing.
757768 ///
758769 /// This could be new chain, or an old chain that is being resumed.
770+ #[ instrument( parent = & self . span, level="debug" , skip_all, fields( ?local_finalized_epoch, ?optimistic_start_epoch) ) ]
759771 pub fn start_syncing (
760772 & mut self ,
761773 network : & mut SyncNetworkContext < T > ,
@@ -794,6 +806,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
794806 /// Add a peer to the chain.
795807 ///
796808 /// If the chain is active, this starts requesting batches from this peer.
809+ #[ instrument( parent = & self . span, level="debug" , skip_all, fields( ?peer_id) ) ]
797810 pub fn add_peer (
798811 & mut self ,
799812 network : & mut SyncNetworkContext < T > ,
@@ -806,6 +819,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
806819 /// An RPC error has occurred.
807820 ///
808821 /// If the batch exists it is re-requested.
822+ #[ instrument( parent = & self . span, level="debug" , skip_all, fields( ?batch_id, ?peer_id, ?request_id) ) ]
809823 pub fn inject_error (
810824 & mut self ,
811825 network : & mut SyncNetworkContext < T > ,
@@ -901,6 +915,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
901915 }
902916
903917 /// Requests the batch assigned to the given id from a given peer.
918+ #[ instrument( parent = & self . span, level="debug" , skip_all, fields( ?batch_id) ) ]
904919 pub fn send_batch (
905920 & mut self ,
906921 network : & mut SyncNetworkContext < T > ,
@@ -976,7 +991,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
976991 }
977992
978993 /// Retries partial column requests within the batch by creating new requests for the failed columns.
979- pub fn retry_partial_batch (
994+ #[ instrument( parent = & self . span, level="debug" , skip_all, fields( ?batch_id, ?id, ?failed_columns) ) ]
995+ fn retry_partial_batch (
980996 & mut self ,
981997 network : & mut SyncNetworkContext < T > ,
982998 batch_id : BatchId ,
@@ -1028,6 +1044,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
10281044
10291045 /// Kickstarts the chain by sending for processing batches that are ready and requesting more
10301046 /// batches if needed.
1047+ #[ instrument( parent = & self . span, level="debug" , skip_all) ]
10311048 pub fn resume (
10321049 & mut self ,
10331050 network : & mut SyncNetworkContext < T > ,
@@ -1040,6 +1057,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
10401057
10411058 /// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer
10421059 /// pool and left over batches until the batch buffer is reached or all peers are exhausted.
1060+ #[ instrument( parent = & self . span, level="debug" , skip_all) ]
10431061 fn request_batches ( & mut self , network : & mut SyncNetworkContext < T > ) -> ProcessingResult {
10441062 if !matches ! ( self . state, ChainSyncingState :: Syncing ) {
10451063 return Ok ( KeepChain ) ;
0 commit comments